This discussion is archived
7 Replies Latest reply: Mar 29, 2013 9:02 PM by 999680 RSS

java multithread : Consumerproducer - problem ?

999680 Newbie
Currently Being Moderated
I have a problem with this java- code above. I guess it is quite common example by Liang. I do not totally understand what is is doing ? Could somebody explain it to me ?

Also I should change it so that there are two producers and producer tells "its name" when it taking/trying to take things from buffer. How to do the change ?


bw, kari



------------

import java.util.concurrent.*;
import java.util.concurrent.locks.*;

public class ConsumerProducer {
private static Buffer buffer = new Buffer();

public static void main(String[] args) {
// Create a thread pool with two threads
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(new ProducerTask());
executor.execute(new ConsumerTask());
executor.shutdown();
}

// A task for adding an int to the buffer
private static class ProducerTask implements Runnable {
public void run() {
try {
int i = 1;
while (true) {
System.out.println("Producer writes " + i);
buffer.write(i++); // Add a value to the buffer
// Put the thread into sleep
Thread.sleep((int)(Math.random() * 10000));
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}

// A task for reading and deleting an int from the buffer
private static class ConsumerTask implements Runnable {
public void run() {
try {
while (true) {
System.out.println("\t\t\tConsumer reads " + buffer.read());
// Put the thread into sleep
Thread.sleep((int)(Math.random() * 10000));
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}

// An inner class for buffer
private static class Buffer {
private static final int CAPACITY = 1; // buffer size
private java.util.LinkedList<Integer> queue =
new java.util.LinkedList<Integer>();

// Create a new lock
private static Lock lock = new ReentrantLock();

// Create two conditions
private static Condition notEmpty = lock.newCondition();
private static Condition notFull = lock.newCondition();

public void write(int value) {
lock.lock(); // Acquire the lock
try {
while (queue.size() == CAPACITY) {
System.out.println("Wait for notFull condition");
notFull.await();
}

queue.offer(value);
notEmpty.signal(); // Signal notEmpty condition
} catch (InterruptedException ex) {
ex.printStackTrace();
} finally {
lock.unlock(); // Release the lock
}
}

public int read() {
int value = 0;
lock.lock(); // Acquire the lock
try {
while (queue.isEmpty()) {
System.out.println("\t\t\tWait for notEmpty condition");
notEmpty.await();
}

value = queue.remove();
notFull.signal(); // Signal notFull condition
} catch (InterruptedException ex) {
ex.printStackTrace();
} finally {
lock.unlock(); // Release the lock
return value;
}
}
}
}

Legend

  • Correct Answers - 10 points
  • Helpful Answers - 5 points