This discussion is archived
3 Replies Latest reply: Mar 1, 2012 12:22 PM by EJP RSS

Facing an issue while coding for thread-safe class

921017 Newbie
Currently Being Moderated
Hi,

I am trying to write a Queue implementation which 1. can be accessed by multiple threads simultaneously and 2. Will reduce load on GC and 3. Avoid locking as far as possible.

Java's class ConcurrentLinkedQueue internally contains a linked list of nodes. So for each object offered to the queue, one internal node is created, and for each poll operation, this internal node becomes eligible for GC.

I tried to implement Queue with linked list of nodes, but here my node contains array of size 256 to hold that many data objects. This will reduce both - the number of new Node() calls as well as number of nodes eligible for GC.

I tested the code with some producer, consumer kind of setup. Queue works fine in case of one producer and one consumer – i.e. two threads. Queue does not works fine for TWO producers AND TWO consumers. I ran the threads for specified time period and then checked if offered count minus polled count matches with remaining size() of the queue. And I found that some objects are missing. The size() is less than expected. Typically about 10 objects are lost in some 300 million objects.

Queue also works fine for multiple producers and consumers when consumers are started a bit late - about 1 sec. Here consumers have something to consume.

Please tell if I have done some mistake or missed out something in my coding.

Details are as follows

PC – Intel core 2 Duo with 3 GHz with Windows XP

java -version
java version "1.6.0_17"
Java(TM) SE Runtime Environment (build 1.6.0_17-b04)
Java HotSpot(TM) Client VM (build 14.3-b01, mixed mode, sharing)

Code for queue
import java.lang.reflect.Array;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
 * Thread-safe queue with minimum garbage collection activity.
 */
public class FastQueue<T> implements Queue<T>
{
     private static final int ARRAY_SIZE = 256;
     private Class<T> clazz;
     private AtomicReference<QueueNode> headNode = new AtomicReference<FastQueue<T>.QueueNode>();
     private AtomicReference<QueueNode> tailNode = new AtomicReference<FastQueue<T>.QueueNode>();
     
     class QueueNode
     {
          private T[] array;
          private AtomicInteger remaining = new AtomicInteger(0);
          private AtomicInteger tailIndex = new AtomicInteger(-1);
          private AtomicInteger headIndex = new AtomicInteger(-1);
          private AtomicReference<QueueNode> nextNodeRef = new AtomicReference<FastQueue<T>.QueueNode>();
     }
     
     private FastQueue()
     {
     }
     
     /**
      * To get instance of queue.
      * @param clazz
      * @return
      */
     @SuppressWarnings({ "rawtypes", "unchecked" })
     public static FastQueue getInstance(Class clazz)
     {
          FastQueue instance = new FastQueue();
          instance.clazz = clazz;
          instance.init();
          return instance;
     }
     
     private void init()
     {
          QueueNode node = createNode();
          this.headNode.set(node);
          this.tailNode.set(node);
     }
     
     @SuppressWarnings("unchecked")
     private QueueNode createNode()
     {
          QueueNode node = new QueueNode();
          node.array = (T[]) Array.newInstance(clazz, ARRAY_SIZE);
          return node;
     }
     
     @Override
     public int size()
     {
          int size = 0;
          int remaining;
          int headIndex;
          QueueNode node = this.headNode.get();
          while(node != null)
          {
               remaining = node.remaining.get();
               headIndex = node.headIndex.get();
               if(remaining > 0 && headIndex < node.array.length)
                    size += remaining;
               node = node.nextNodeRef.get();
          }
          return size;
     }

     @Override
     public boolean isEmpty()
     {
          return (peek() == null);
     }

     @Override
     public boolean contains(Object o) { throw new UnsupportedOperationException(); }

     @Override
     public Iterator<T> iterator() { throw new UnsupportedOperationException(); }

     @Override
     public Object[] toArray() { throw new UnsupportedOperationException(); }

     @SuppressWarnings("hiding")
     @Override
     public <T> T[] toArray(T[] a) { throw new UnsupportedOperationException(); }

     @Override
     public boolean remove(Object o) { throw new UnsupportedOperationException(); }

     @Override
     public boolean containsAll(Collection<?> c) { throw new UnsupportedOperationException(); }

     @Override
     public boolean addAll(Collection<? extends T> c)
     {
          for(T ele : c)
               add(ele);
          return true;
     }

     @Override
     public boolean removeAll(Collection<?> c) { throw new UnsupportedOperationException(); }

     @Override
     public boolean retainAll(Collection<?> c) { throw new UnsupportedOperationException(); }

     @Override
     public void clear()
     {
          init();
     }

     @Override
     public boolean add(T e)
     {
          boolean success = offer(e);
          if(success)
               return true;
          else
               throw new IllegalStateException();
     }

     @Override
     public boolean offer(T e)
     {
          QueueNode node = this.tailNode.get();
          QueueNode newNode;
          int tailIndex;
          while(node != null)
          {
               tailIndex = node.tailIndex.incrementAndGet();     // reserve a place
               if(tailIndex < node.array.length)
               {
                    // OK. place present in node
                    node.array[tailIndex] = e;
                    node.remaining.incrementAndGet();
                    return true;
               }
               else
               {
                    // node full. so create a new node and try again.
                    if(node.nextNodeRef.get() == null)
                    {
                         newNode = createNode();
                         boolean success = node.nextNodeRef.compareAndSet(null, newNode);
                         if(success)     // this thread did it. so change tail pointer in main class
                         {
                              this.tailNode.set(newNode);
                         }
                    }
                    node = node.nextNodeRef.get();
               }
          }
          return false;
     }

     @Override
     public T remove()
     {
          T value = poll();
          if(value != null)
               return value;
          else
               throw new NoSuchElementException();
     }

     @Override
     public T poll()
     {
          QueueNode node = this.headNode.get();
          QueueNode nextNode;
          int headIndex;
          int remaining;
          while(node != null)
          {
               remaining = node.remaining.decrementAndGet();     // reserve one element
               if(remaining >= 0)
               {
                    headIndex = node.headIndex.incrementAndGet();
                    T value = node.array[headIndex];
                    node.array[headIndex] = null;
                    // if headIndex is last possible index, then this node can be removed.
                    if(headIndex == node.array.length-1)
                    {
                         nextNode = node.nextNodeRef.get();
                         if(nextNode != null)
                         {
                              this.headNode.set(nextNode);
                         }
                    }
                    return value;
               }
               else
               {
                    // if this node is not full then free the reserved element
                    if(node.headIndex.get() < node.array.length-1)
                    {
                         node.remaining.incrementAndGet();
                         return null;     // no element to poll
                    }
                    else
                    {
                         // this node is exhausted so try with next node
                         node = node.nextNodeRef.get();
                    }
               }
          }
          return null;
     }

     @Override
     public T element()
     {
          T ele = peek();
          if(ele != null)
               return ele;
          else
               throw new NoSuchElementException();
     }

     @Override
     public T peek()
     {
          QueueNode node = this.headNode.get();
          int headIndex;
          int remaining;
          while(node != null)
          {
               headIndex = node.headIndex.get();
               remaining = node.remaining.get();
               if(remaining > 0)
               {
                    return node.array[headIndex+1];     // OK. data present
               }
               else
               {
                    if(headIndex < node.array.length -1)
                         return null;     // no elements to peek
                    else
                         node = node.nextNodeRef.get();     // this node is exhausted. so check in next node
               }
          }
          return null;
     }
}
Code for test program
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;

public class TestFastQueue
{
     // Test parameters
     int testTime = 3;          // time for test in seconds
     int producerCount = 2;     // number of producer threads
     int consumerCount = 2;     // number of consumer threads
     int consumerLag = 0;     // lag in millisecond before starting consumer threads.
     
     static long endTime;
     static AtomicLong offerCount = new AtomicLong(0);
     static AtomicLong offerFailCount = new AtomicLong(0);
     static AtomicLong pollCount = new AtomicLong(0);
     static AtomicLong emptyPollCount = new AtomicLong(0);
     static Queue<Integer> queue;
     static Integer objectToOffer = new Integer(1);
     
     public static void main(String[] args) throws Exception
     {
          TestFastQueue obj = new TestFastQueue();
          obj.testMethod();
     }
     public void testMethod() throws Exception
     {
          queue = FastQueue.getInstance(Integer.class);
          //queue = new ConcurrentLinkedQueue<Integer>();
          
          Producer[] producers = new Producer[producerCount];
          for(int i = 0; i < producerCount; i++)
               producers[i] = new Producer();
          
          Consumer[] consumers = new Consumer[consumerCount];
          for(int i = 0; i < consumerCount; i++)
               consumers[i] = new Consumer();
          
          endTime = System.currentTimeMillis() + (testTime * 1000);
          
          for(int i = 0; i < producerCount; i++)
               producers.start();
          
          Thread.sleep(consumerLag);
          for(int i = 0; i < consumerCount; i++)
               consumers[i].start();
          
          for(int i = 0; i < producerCount; i++)
               producers[i].join();
          for(int i = 0; i < consumerCount; i++)
               consumers[i].join();

          System.out.println("Testing " + queue.getClass().getName() + " ---");
          System.out.println("Offered " + offerCount.get() + " Throughput " + (offerCount.get() / testTime));
          System.out.println("Polled " + pollCount.get() + " Throughput " + (pollCount.get() / testTime));
          System.out.println("Empty polled " + emptyPollCount.get());
          System.out.println("Offer failed " + offerFailCount.get());
          System.out.println("Remaining by size() " + queue.size());
          System.out.println("Offer poll difference " + (offerCount.get() - pollCount.get()));
          
//          int remaining = 0;
//          while(queue.poll() != null)
//               remaining += 1;
//          System.out.println("Remaining by poll " + remaining);     // just to verify
     }
     
     
     class Producer extends Thread
     {
          public void run()
          {
               boolean success;
               while(System.currentTimeMillis() < endTime)
               {
                    success = queue.offer(objectToOffer);
                    if(success)
                         offerCount.incrementAndGet();
                    else
                         offerFailCount.incrementAndGet();
               }
          }
     }
     class Consumer extends Thread
     {
          public void run()
          {
               Integer value;
               while(System.currentTimeMillis() < endTime)
               {
                    value = queue.poll();
                    if(value != null)
                         pollCount.incrementAndGet();
                    else
                         emptyPollCount.incrementAndGet();
               }
          }
     }
}
Thanks for reading so far and also thanks in advance for your answers

Niteen                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    
  • 1. Re: Facing an issue while coding for thread-safe class
    EJP Guru
    Currently Being Moderated
    I can't see anything thread-safe about this at all beyond the use of AtomicIntegers. That's not sufficient.
  • 2. Re: Facing an issue while coding for thread-safe class
    921017 Newbie
    Currently Being Moderated
    Hi,

    What I was thinking that when multiple threads try to offer objects at same time, because of atomic operation tailIndex.incrementAndGet(), each will secure unique place in the array.
    and in case node is full, because of atomic operation nextNode.compareAndSet() one thread will attach new node to existing node, and then all threads will use the new node.

    Also when multiple threads try to poll the objects at same time, they will secure one object each by atomic operation remaining.decrementAndGet() and then access one of the secured object using headIndex.incrementAndGet()

    Am I missing out something here? Is this not sufficient to make a concurrent queue. Java's ConcurrentLinkedQueue also uses atomic operation to update internal references. That class is also not using any 'synchronized' thing or any lock.

    Please suggest where I am going wrong.

    Thanks and Regards

    Niteen
  • 3. Re: Facing an issue while coding for thread-safe class
    EJP Guru
    Currently Being Moderated
    No it isn't sufficient. Those atomic things are only atomic while you're in them. If you use two of them in an expression, the expression doesn't magically become atomic.

    Same reasoning that says you have to synchronize on a Vector yourself while iterating over it, even though all Vector's methods are synchronized.

Legend

  • Correct Answers - 10 points
  • Helpful Answers - 5 points