This discussion is archived
6 Replies Latest reply: Dec 25, 2012 9:02 PM by ManojC RSS

Multithreading using ExecutorService in Java!!

ManojC Explorer
Currently Being Moderated
Hi,

In my program I would like to introduce multithreading with executor service and arrayblockingqueue. Once I run the below code for the Consumer pool only one thread was active even thogh the pool size is '10'. Please help me to fine tune this program.

Code:
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;


public class ProducerConsumerPattern
{
protected static boolean Done=false;
public static void main(String args[])
{
//Creating shared object
ArrayBlockingQueue sharedQueue = new ArrayBlockingQueue(100);
ExecutorService reportExecutor = Executors.newFixedThreadPool(1);
ExecutorService updateExecutor = Executors.newFixedThreadPool(10);
Producer producer = new Producer(sharedQueue,Done);
Consumer consumer = new Consumer(sharedQueue,Done);
//Creating Producer and Consumer Thread
reportExecutor.execute(producer);
updateExecutor.execute(consumer);
reportExecutor.shutdown();
if(reportExecutor.isShutdown())
{
Done = true;
}
updateExecutor.shutdown();
//Starting producer and Consumer thread
}
}
//Producer Class in java
class Producer implements Runnable {
private final BlockingQueue sharedQueue;
protected boolean Done;


public Producer(BlockingQueue sharedQueue,boolean Done)
{
this.sharedQueue = sharedQueue;
this.Done = Done;
}
public void run() {
for(int i=0; i<10; i++)
{
try {
ArrayList<String> myArr = new ArrayList<String>();
myArr.add("Italian Riviera");
myArr.add("Jersey Shore");
myArr.add("Puerto Rico");
myArr.add("Los Cabos Corridor");
myArr.add("Lubmin");
myArr.add("Coney Island");
myArr.add("Karlovy Vary");
myArr.add("Bourbon-l'Archambault");
myArr.add("Walt Disney World Resort");
myArr.add("Barbados");
sharedQueue.put(myArr);
System.out.println("Produced: " + i+Thread.currentThread());
}
catch (InterruptedException ex)
{
Logger.getLogger(Producer.class.getName()).log(Lev el.SEVERE, null, ex);
}
}
Done = true;
System.out.println(Done);

}

}
//Consumer Class in Java
class Consumer implements Runnable{
private final BlockingQueue sharedQueue;
protected boolean Done;

public Consumer (BlockingQueue sharedQueue,boolean Done) {
this.sharedQueue = sharedQueue;
this.Done = Done;
}
public void run() {
while(!Done || !sharedQueue.isEmpty()){
try {
System.out.println("Consumed: "+Thread.currentThread()+ sharedQueue.take());
Thread.sleep(1000);
} catch (InterruptedException ex) {
Logger.getLogger(Consumer.class.getName()).log(Lev el.SEVERE, null, ex);
}
}
}
}
  • 1. Re: Multithreading using ExecutorService in Java!!
    Kayaman Guru
    Currently Being Moderated
    First format the code using
     tags.                                                                                                                                                                                                                        
  • 2. Re: Multithreading using ExecutorService in Java!!
    ManojC Explorer
    Currently Being Moderated
    Please find the code below.

    import java.util.ArrayList;
    import java.util.LinkedList;
    import java.util.List;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.logging.Level;
    import java.util.logging.Logger;

    public class ProducerConsumerPattern {
         protected static boolean Done = false;

         public static void main(String args[]) {
              // Creating shared object
              ArrayBlockingQueue sharedQueue = new ArrayBlockingQueue(100);
              ExecutorService reportExecutor = Executors.newFixedThreadPool(2);
              ExecutorService updateExecutor = Executors.newFixedThreadPool(10);
              Producer producer = new Producer(sharedQueue, Done);
              Consumer consumer = new Consumer(sharedQueue, Done);
              // Creating Producer and Consumer Thread
              reportExecutor.execute(producer);
              updateExecutor.execute(consumer);
              reportExecutor.shutdown();
              if (reportExecutor.isShutdown()) {
                   Done = true;
              }
              updateExecutor.shutdown();
              // Starting producer and Consumer thread
         }
    }

    // Producer Class in java
    class Producer implements Runnable {
         private final BlockingQueue sharedQueue;
         protected boolean Done;

         public Producer(BlockingQueue sharedQueue, boolean Done) {
              this.sharedQueue = sharedQueue;
              this.Done = Done;
         }

         public void run() {
              for (int i = 0; i < 10; i++) {
                   try {
                        ArrayList<String> myArr = new ArrayList<String>();
                        myArr.add("Italian Riviera");
                        myArr.add("Jersey Shore");
                        myArr.add("Puerto Rico");
                        myArr.add("Los Cabos Corridor");
                        myArr.add("Lubmin");
                        myArr.add("Coney Island");
                        myArr.add("Karlovy Vary");
                        myArr.add("Bourbon-l'Archambault");
                        myArr.add("Walt Disney World Resort");
                        myArr.add("Barbados");
                        sharedQueue.put(myArr);
                        System.out.println("Produced: " + i + Thread.currentThread());
                   } catch (InterruptedException ex) {
                        Logger.getLogger(Producer.class.getName()).log(Level.SEVERE,
                                  null, ex);
                   }
              }
              Done = true;
              System.out.println(Done);

         }

    }

    // Consumer Class in Java
    class Consumer implements Runnable {
         private final BlockingQueue sharedQueue;
         protected boolean Done;

         public Consumer(BlockingQueue sharedQueue, boolean Done) {
              this.sharedQueue = sharedQueue;
              this.Done = Done;
         }

         public void run() {
              while (!Done || !sharedQueue.isEmpty()) {
                   try {
                        System.out.println("Consumed: " + Thread.currentThread()
                                  + sharedQueue.take());
                        // Thread.sleep(1000);
                   } catch (InterruptedException ex) {
                        Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE,
                                  null, ex);
                   }
              }
         }
    }
  • 3. Re: Multithreading using ExecutorService in Java!!
    Kayaman Guru
    Currently Being Moderated
    Format the code using the
     tags.
    [code]
    public void formattedCode() {
        System.out.println("Hi! I'm easier to read!");
    }
    [/code]                                                                                                                                                                                                                                                                                            
  • 4. Re: Multithreading using ExecutorService in Java!!
    MrBabakishiyev Newbie
    Currently Being Moderated
    First code must be in forum as below, and please write more detail about your problem.
    /*
     * To change this template, choose Tools | Templates
     * and open the template in the editor.
     */
    package oraclediscussionforum;
    
    import java.util.ArrayList;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.logging.Level;
    import java.util.logging.Logger;
    
    public class ProducerConsumerPattern {
    
        protected static boolean Done = false;
    
        public static void main(String args[]) {
    // Creating shared object
            ArrayBlockingQueue sharedQueue = new ArrayBlockingQueue(100);
            ExecutorService reportExecutor = Executors.newFixedThreadPool(2);
            ExecutorService updateExecutor = Executors.newFixedThreadPool(10);
            Producer producer = new Producer(sharedQueue, Done);
            Consumer consumer = new Consumer(sharedQueue, Done);
    // Creating Producer and Consumer Thread
            reportExecutor.execute(producer);
            updateExecutor.execute(consumer);
            reportExecutor.shutdown();
            if (reportExecutor.isShutdown()) {
                Done = true;
            }
            updateExecutor.shutdown();
    // Starting producer and Consumer thread
        }
    }
    
    // Producer Class in java
    class Producer implements Runnable {
    
        private final BlockingQueue sharedQueue;
        protected boolean Done;
    
        public Producer(BlockingQueue sharedQueue, boolean Done) {
            this.sharedQueue = sharedQueue;
            this.Done = Done;
        }
    
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    ArrayList<String> myArr = new ArrayList<String>();
                    myArr.add("Italian Riviera");
                    myArr.add("Jersey Shore");
                    myArr.add("Puerto Rico");
                    myArr.add("Los Cabos Corridor");
                    myArr.add("Lubmin");
                    myArr.add("Coney Island");
                    myArr.add("Karlovy Vary");
                    myArr.add("Bourbon-l'Archambault");
                    myArr.add("Walt Disney World Resort");
                    myArr.add("Barbados");
                    sharedQueue.put(myArr);
                    System.out.println("Produced: " + i + Thread.currentThread());
                } catch (InterruptedException ex) {
                    Logger.getLogger(Producer.class.getName()).log(Level.SEVERE,
                            null, ex);
                }
            }
            Done = true;
            System.out.println(Done);
    
        }
    }
    
    // Consumer Class in Java
    class Consumer implements Runnable {
    
        private final BlockingQueue sharedQueue;
        protected boolean Done;
    
        public Consumer(BlockingQueue sharedQueue, boolean Done) {
            this.sharedQueue = sharedQueue;
            this.Done = Done;
        }
    
        @Override
        public void run() {
            while (!Done || !sharedQueue.isEmpty()) {
                try {
                    System.out.println("Consumed: " + Thread.currentThread()
                            + sharedQueue.take());
    // Thread.sleep(1000);
                } catch (InterruptedException ex) {
                    Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE,
                            null, ex);
                }
            }
        }
    }
  • 5. Re: Multithreading using ExecutorService in Java!!
    jtahlborn Expert
    Currently Being Moderated
    You only put one consumer instance in the thread pool, therefore only one thread is active. if you want 10 active threads, you will need to put 10 consumers in the thread pool.
  • 6. Re: Multithreading using ExecutorService in Java!!
    ManojC Explorer
    Currently Being Moderated
    Thanks for you inputs... After intializing the 10 consumer threads I am able to invoke multiple threads. In the program I would like to stop/kill the thread once the producer finished putting in the queue and queue is empty. Could you please advice me how to design this Runnable interface.

Legend

  • Correct Answers - 10 points
  • Helpful Answers - 5 points