6 Replies Latest reply: Dec 25, 2012 11:02 PM by ManojC RSS

    Multithreading using ExecutorService in Java!!

    ManojC
      Hi,

      In my program I would like to introduce multithreading with executor service and arrayblockingqueue. Once I run the below code for the Consumer pool only one thread was active even thogh the pool size is '10'. Please help me to fine tune this program.

      Code:
      import java.util.ArrayList;
      import java.util.LinkedList;
      import java.util.List;
      import java.util.concurrent.ArrayBlockingQueue;
      import java.util.concurrent.BlockingQueue;
      import java.util.concurrent.Callable;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      import java.util.concurrent.LinkedBlockingQueue;
      import java.util.logging.Level;
      import java.util.logging.Logger;


      public class ProducerConsumerPattern
      {
      protected static boolean Done=false;
      public static void main(String args[])
      {
      //Creating shared object
      ArrayBlockingQueue sharedQueue = new ArrayBlockingQueue(100);
      ExecutorService reportExecutor = Executors.newFixedThreadPool(1);
      ExecutorService updateExecutor = Executors.newFixedThreadPool(10);
      Producer producer = new Producer(sharedQueue,Done);
      Consumer consumer = new Consumer(sharedQueue,Done);
      //Creating Producer and Consumer Thread
      reportExecutor.execute(producer);
      updateExecutor.execute(consumer);
      reportExecutor.shutdown();
      if(reportExecutor.isShutdown())
      {
      Done = true;
      }
      updateExecutor.shutdown();
      //Starting producer and Consumer thread
      }
      }
      //Producer Class in java
      class Producer implements Runnable {
      private final BlockingQueue sharedQueue;
      protected boolean Done;


      public Producer(BlockingQueue sharedQueue,boolean Done)
      {
      this.sharedQueue = sharedQueue;
      this.Done = Done;
      }
      public void run() {
      for(int i=0; i<10; i++)
      {
      try {
      ArrayList<String> myArr = new ArrayList<String>();
      myArr.add("Italian Riviera");
      myArr.add("Jersey Shore");
      myArr.add("Puerto Rico");
      myArr.add("Los Cabos Corridor");
      myArr.add("Lubmin");
      myArr.add("Coney Island");
      myArr.add("Karlovy Vary");
      myArr.add("Bourbon-l'Archambault");
      myArr.add("Walt Disney World Resort");
      myArr.add("Barbados");
      sharedQueue.put(myArr);
      System.out.println("Produced: " + i+Thread.currentThread());
      }
      catch (InterruptedException ex)
      {
      Logger.getLogger(Producer.class.getName()).log(Lev el.SEVERE, null, ex);
      }
      }
      Done = true;
      System.out.println(Done);

      }

      }
      //Consumer Class in Java
      class Consumer implements Runnable{
      private final BlockingQueue sharedQueue;
      protected boolean Done;

      public Consumer (BlockingQueue sharedQueue,boolean Done) {
      this.sharedQueue = sharedQueue;
      this.Done = Done;
      }
      public void run() {
      while(!Done || !sharedQueue.isEmpty()){
      try {
      System.out.println("Consumed: "+Thread.currentThread()+ sharedQueue.take());
      Thread.sleep(1000);
      } catch (InterruptedException ex) {
      Logger.getLogger(Consumer.class.getName()).log(Lev el.SEVERE, null, ex);
      }
      }
      }
      }
        • 1. Re: Multithreading using ExecutorService in Java!!
          Kayaman
          First format the code using
           tags.                                                                                                                                                                                                                        
          • 2. Re: Multithreading using ExecutorService in Java!!
            ManojC
            Please find the code below.

            import java.util.ArrayList;
            import java.util.LinkedList;
            import java.util.List;
            import java.util.concurrent.ArrayBlockingQueue;
            import java.util.concurrent.BlockingQueue;
            import java.util.concurrent.Callable;
            import java.util.concurrent.ExecutorService;
            import java.util.concurrent.Executors;
            import java.util.concurrent.LinkedBlockingQueue;
            import java.util.logging.Level;
            import java.util.logging.Logger;

            public class ProducerConsumerPattern {
                 protected static boolean Done = false;

                 public static void main(String args[]) {
                      // Creating shared object
                      ArrayBlockingQueue sharedQueue = new ArrayBlockingQueue(100);
                      ExecutorService reportExecutor = Executors.newFixedThreadPool(2);
                      ExecutorService updateExecutor = Executors.newFixedThreadPool(10);
                      Producer producer = new Producer(sharedQueue, Done);
                      Consumer consumer = new Consumer(sharedQueue, Done);
                      // Creating Producer and Consumer Thread
                      reportExecutor.execute(producer);
                      updateExecutor.execute(consumer);
                      reportExecutor.shutdown();
                      if (reportExecutor.isShutdown()) {
                           Done = true;
                      }
                      updateExecutor.shutdown();
                      // Starting producer and Consumer thread
                 }
            }

            // Producer Class in java
            class Producer implements Runnable {
                 private final BlockingQueue sharedQueue;
                 protected boolean Done;

                 public Producer(BlockingQueue sharedQueue, boolean Done) {
                      this.sharedQueue = sharedQueue;
                      this.Done = Done;
                 }

                 public void run() {
                      for (int i = 0; i < 10; i++) {
                           try {
                                ArrayList<String> myArr = new ArrayList<String>();
                                myArr.add("Italian Riviera");
                                myArr.add("Jersey Shore");
                                myArr.add("Puerto Rico");
                                myArr.add("Los Cabos Corridor");
                                myArr.add("Lubmin");
                                myArr.add("Coney Island");
                                myArr.add("Karlovy Vary");
                                myArr.add("Bourbon-l'Archambault");
                                myArr.add("Walt Disney World Resort");
                                myArr.add("Barbados");
                                sharedQueue.put(myArr);
                                System.out.println("Produced: " + i + Thread.currentThread());
                           } catch (InterruptedException ex) {
                                Logger.getLogger(Producer.class.getName()).log(Level.SEVERE,
                                          null, ex);
                           }
                      }
                      Done = true;
                      System.out.println(Done);

                 }

            }

            // Consumer Class in Java
            class Consumer implements Runnable {
                 private final BlockingQueue sharedQueue;
                 protected boolean Done;

                 public Consumer(BlockingQueue sharedQueue, boolean Done) {
                      this.sharedQueue = sharedQueue;
                      this.Done = Done;
                 }

                 public void run() {
                      while (!Done || !sharedQueue.isEmpty()) {
                           try {
                                System.out.println("Consumed: " + Thread.currentThread()
                                          + sharedQueue.take());
                                // Thread.sleep(1000);
                           } catch (InterruptedException ex) {
                                Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE,
                                          null, ex);
                           }
                      }
                 }
            }
            • 3. Re: Multithreading using ExecutorService in Java!!
              Kayaman
              Format the code using the
               tags.
              [code]
              public void formattedCode() {
                  System.out.println("Hi! I'm easier to read!");
              }
              [/code]                                                                                                                                                                                                                                                                                            
              • 4. Re: Multithreading using ExecutorService in Java!!
                MrBabakishiyev
                First code must be in forum as below, and please write more detail about your problem.
                /*
                 * To change this template, choose Tools | Templates
                 * and open the template in the editor.
                 */
                package oraclediscussionforum;
                
                import java.util.ArrayList;
                import java.util.concurrent.ArrayBlockingQueue;
                import java.util.concurrent.BlockingQueue;
                import java.util.concurrent.ExecutorService;
                import java.util.concurrent.Executors;
                import java.util.logging.Level;
                import java.util.logging.Logger;
                
                public class ProducerConsumerPattern {
                
                    protected static boolean Done = false;
                
                    public static void main(String args[]) {
                // Creating shared object
                        ArrayBlockingQueue sharedQueue = new ArrayBlockingQueue(100);
                        ExecutorService reportExecutor = Executors.newFixedThreadPool(2);
                        ExecutorService updateExecutor = Executors.newFixedThreadPool(10);
                        Producer producer = new Producer(sharedQueue, Done);
                        Consumer consumer = new Consumer(sharedQueue, Done);
                // Creating Producer and Consumer Thread
                        reportExecutor.execute(producer);
                        updateExecutor.execute(consumer);
                        reportExecutor.shutdown();
                        if (reportExecutor.isShutdown()) {
                            Done = true;
                        }
                        updateExecutor.shutdown();
                // Starting producer and Consumer thread
                    }
                }
                
                // Producer Class in java
                class Producer implements Runnable {
                
                    private final BlockingQueue sharedQueue;
                    protected boolean Done;
                
                    public Producer(BlockingQueue sharedQueue, boolean Done) {
                        this.sharedQueue = sharedQueue;
                        this.Done = Done;
                    }
                
                    @Override
                    public void run() {
                        for (int i = 0; i < 10; i++) {
                            try {
                                ArrayList<String> myArr = new ArrayList<String>();
                                myArr.add("Italian Riviera");
                                myArr.add("Jersey Shore");
                                myArr.add("Puerto Rico");
                                myArr.add("Los Cabos Corridor");
                                myArr.add("Lubmin");
                                myArr.add("Coney Island");
                                myArr.add("Karlovy Vary");
                                myArr.add("Bourbon-l'Archambault");
                                myArr.add("Walt Disney World Resort");
                                myArr.add("Barbados");
                                sharedQueue.put(myArr);
                                System.out.println("Produced: " + i + Thread.currentThread());
                            } catch (InterruptedException ex) {
                                Logger.getLogger(Producer.class.getName()).log(Level.SEVERE,
                                        null, ex);
                            }
                        }
                        Done = true;
                        System.out.println(Done);
                
                    }
                }
                
                // Consumer Class in Java
                class Consumer implements Runnable {
                
                    private final BlockingQueue sharedQueue;
                    protected boolean Done;
                
                    public Consumer(BlockingQueue sharedQueue, boolean Done) {
                        this.sharedQueue = sharedQueue;
                        this.Done = Done;
                    }
                
                    @Override
                    public void run() {
                        while (!Done || !sharedQueue.isEmpty()) {
                            try {
                                System.out.println("Consumed: " + Thread.currentThread()
                                        + sharedQueue.take());
                // Thread.sleep(1000);
                            } catch (InterruptedException ex) {
                                Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE,
                                        null, ex);
                            }
                        }
                    }
                }
                • 5. Re: Multithreading using ExecutorService in Java!!
                  jtahlborn
                  You only put one consumer instance in the thread pool, therefore only one thread is active. if you want 10 active threads, you will need to put 10 consumers in the thread pool.
                  • 6. Re: Multithreading using ExecutorService in Java!!
                    ManojC
                    Thanks for you inputs... After intializing the 10 consumer threads I am able to invoke multiple threads. In the program I would like to stop/kill the thread once the producer finished putting in the queue and queue is empty. Could you please advice me how to design this Runnable interface.