3 Replies Latest reply: Mar 1, 2012 2:22 PM by EJP RSS

    Facing an issue while coding for thread-safe class

    921017
      Hi,

      I am trying to write a Queue implementation which 1. can be accessed by multiple threads simultaneously and 2. Will reduce load on GC and 3. Avoid locking as far as possible.

      Java's class ConcurrentLinkedQueue internally contains a linked list of nodes. So for each object offered to the queue, one internal node is created, and for each poll operation, this internal node becomes eligible for GC.

      I tried to implement Queue with linked list of nodes, but here my node contains array of size 256 to hold that many data objects. This will reduce both - the number of new Node() calls as well as number of nodes eligible for GC.

      I tested the code with some producer, consumer kind of setup. Queue works fine in case of one producer and one consumer – i.e. two threads. Queue does not works fine for TWO producers AND TWO consumers. I ran the threads for specified time period and then checked if offered count minus polled count matches with remaining size() of the queue. And I found that some objects are missing. The size() is less than expected. Typically about 10 objects are lost in some 300 million objects.

      Queue also works fine for multiple producers and consumers when consumers are started a bit late - about 1 sec. Here consumers have something to consume.

      Please tell if I have done some mistake or missed out something in my coding.

      Details are as follows

      PC – Intel core 2 Duo with 3 GHz with Windows XP

      java -version
      java version "1.6.0_17"
      Java(TM) SE Runtime Environment (build 1.6.0_17-b04)
      Java HotSpot(TM) Client VM (build 14.3-b01, mixed mode, sharing)

      Code for queue
      import java.lang.reflect.Array;
      import java.util.Collection;
      import java.util.Iterator;
      import java.util.NoSuchElementException;
      import java.util.Queue;
      import java.util.concurrent.atomic.AtomicInteger;
      import java.util.concurrent.atomic.AtomicReference;
      
      /**
       * Thread-safe queue with minimum garbage collection activity.
       */
      public class FastQueue<T> implements Queue<T>
      {
           private static final int ARRAY_SIZE = 256;
           private Class<T> clazz;
           private AtomicReference<QueueNode> headNode = new AtomicReference<FastQueue<T>.QueueNode>();
           private AtomicReference<QueueNode> tailNode = new AtomicReference<FastQueue<T>.QueueNode>();
           
           class QueueNode
           {
                private T[] array;
                private AtomicInteger remaining = new AtomicInteger(0);
                private AtomicInteger tailIndex = new AtomicInteger(-1);
                private AtomicInteger headIndex = new AtomicInteger(-1);
                private AtomicReference<QueueNode> nextNodeRef = new AtomicReference<FastQueue<T>.QueueNode>();
           }
           
           private FastQueue()
           {
           }
           
           /**
            * To get instance of queue.
            * @param clazz
            * @return
            */
           @SuppressWarnings({ "rawtypes", "unchecked" })
           public static FastQueue getInstance(Class clazz)
           {
                FastQueue instance = new FastQueue();
                instance.clazz = clazz;
                instance.init();
                return instance;
           }
           
           private void init()
           {
                QueueNode node = createNode();
                this.headNode.set(node);
                this.tailNode.set(node);
           }
           
           @SuppressWarnings("unchecked")
           private QueueNode createNode()
           {
                QueueNode node = new QueueNode();
                node.array = (T[]) Array.newInstance(clazz, ARRAY_SIZE);
                return node;
           }
           
           @Override
           public int size()
           {
                int size = 0;
                int remaining;
                int headIndex;
                QueueNode node = this.headNode.get();
                while(node != null)
                {
                     remaining = node.remaining.get();
                     headIndex = node.headIndex.get();
                     if(remaining > 0 && headIndex < node.array.length)
                          size += remaining;
                     node = node.nextNodeRef.get();
                }
                return size;
           }
      
           @Override
           public boolean isEmpty()
           {
                return (peek() == null);
           }
      
           @Override
           public boolean contains(Object o) { throw new UnsupportedOperationException(); }
      
           @Override
           public Iterator<T> iterator() { throw new UnsupportedOperationException(); }
      
           @Override
           public Object[] toArray() { throw new UnsupportedOperationException(); }
      
           @SuppressWarnings("hiding")
           @Override
           public <T> T[] toArray(T[] a) { throw new UnsupportedOperationException(); }
      
           @Override
           public boolean remove(Object o) { throw new UnsupportedOperationException(); }
      
           @Override
           public boolean containsAll(Collection<?> c) { throw new UnsupportedOperationException(); }
      
           @Override
           public boolean addAll(Collection<? extends T> c)
           {
                for(T ele : c)
                     add(ele);
                return true;
           }
      
           @Override
           public boolean removeAll(Collection<?> c) { throw new UnsupportedOperationException(); }
      
           @Override
           public boolean retainAll(Collection<?> c) { throw new UnsupportedOperationException(); }
      
           @Override
           public void clear()
           {
                init();
           }
      
           @Override
           public boolean add(T e)
           {
                boolean success = offer(e);
                if(success)
                     return true;
                else
                     throw new IllegalStateException();
           }
      
           @Override
           public boolean offer(T e)
           {
                QueueNode node = this.tailNode.get();
                QueueNode newNode;
                int tailIndex;
                while(node != null)
                {
                     tailIndex = node.tailIndex.incrementAndGet();     // reserve a place
                     if(tailIndex < node.array.length)
                     {
                          // OK. place present in node
                          node.array[tailIndex] = e;
                          node.remaining.incrementAndGet();
                          return true;
                     }
                     else
                     {
                          // node full. so create a new node and try again.
                          if(node.nextNodeRef.get() == null)
                          {
                               newNode = createNode();
                               boolean success = node.nextNodeRef.compareAndSet(null, newNode);
                               if(success)     // this thread did it. so change tail pointer in main class
                               {
                                    this.tailNode.set(newNode);
                               }
                          }
                          node = node.nextNodeRef.get();
                     }
                }
                return false;
           }
      
           @Override
           public T remove()
           {
                T value = poll();
                if(value != null)
                     return value;
                else
                     throw new NoSuchElementException();
           }
      
           @Override
           public T poll()
           {
                QueueNode node = this.headNode.get();
                QueueNode nextNode;
                int headIndex;
                int remaining;
                while(node != null)
                {
                     remaining = node.remaining.decrementAndGet();     // reserve one element
                     if(remaining >= 0)
                     {
                          headIndex = node.headIndex.incrementAndGet();
                          T value = node.array[headIndex];
                          node.array[headIndex] = null;
                          // if headIndex is last possible index, then this node can be removed.
                          if(headIndex == node.array.length-1)
                          {
                               nextNode = node.nextNodeRef.get();
                               if(nextNode != null)
                               {
                                    this.headNode.set(nextNode);
                               }
                          }
                          return value;
                     }
                     else
                     {
                          // if this node is not full then free the reserved element
                          if(node.headIndex.get() < node.array.length-1)
                          {
                               node.remaining.incrementAndGet();
                               return null;     // no element to poll
                          }
                          else
                          {
                               // this node is exhausted so try with next node
                               node = node.nextNodeRef.get();
                          }
                     }
                }
                return null;
           }
      
           @Override
           public T element()
           {
                T ele = peek();
                if(ele != null)
                     return ele;
                else
                     throw new NoSuchElementException();
           }
      
           @Override
           public T peek()
           {
                QueueNode node = this.headNode.get();
                int headIndex;
                int remaining;
                while(node != null)
                {
                     headIndex = node.headIndex.get();
                     remaining = node.remaining.get();
                     if(remaining > 0)
                     {
                          return node.array[headIndex+1];     // OK. data present
                     }
                     else
                     {
                          if(headIndex < node.array.length -1)
                               return null;     // no elements to peek
                          else
                               node = node.nextNodeRef.get();     // this node is exhausted. so check in next node
                     }
                }
                return null;
           }
      }
      Code for test program
      import java.util.Queue;
      import java.util.concurrent.ConcurrentLinkedQueue;
      import java.util.concurrent.atomic.AtomicLong;
      
      public class TestFastQueue
      {
           // Test parameters
           int testTime = 3;          // time for test in seconds
           int producerCount = 2;     // number of producer threads
           int consumerCount = 2;     // number of consumer threads
           int consumerLag = 0;     // lag in millisecond before starting consumer threads.
           
           static long endTime;
           static AtomicLong offerCount = new AtomicLong(0);
           static AtomicLong offerFailCount = new AtomicLong(0);
           static AtomicLong pollCount = new AtomicLong(0);
           static AtomicLong emptyPollCount = new AtomicLong(0);
           static Queue<Integer> queue;
           static Integer objectToOffer = new Integer(1);
           
           public static void main(String[] args) throws Exception
           {
                TestFastQueue obj = new TestFastQueue();
                obj.testMethod();
           }
           public void testMethod() throws Exception
           {
                queue = FastQueue.getInstance(Integer.class);
                //queue = new ConcurrentLinkedQueue<Integer>();
                
                Producer[] producers = new Producer[producerCount];
                for(int i = 0; i < producerCount; i++)
                     producers[i] = new Producer();
                
                Consumer[] consumers = new Consumer[consumerCount];
                for(int i = 0; i < consumerCount; i++)
                     consumers[i] = new Consumer();
                
                endTime = System.currentTimeMillis() + (testTime * 1000);
                
                for(int i = 0; i < producerCount; i++)
                     producers.start();
                
                Thread.sleep(consumerLag);
                for(int i = 0; i < consumerCount; i++)
                     consumers[i].start();
                
                for(int i = 0; i < producerCount; i++)
                     producers[i].join();
                for(int i = 0; i < consumerCount; i++)
                     consumers[i].join();

                System.out.println("Testing " + queue.getClass().getName() + " ---");
                System.out.println("Offered " + offerCount.get() + " Throughput " + (offerCount.get() / testTime));
                System.out.println("Polled " + pollCount.get() + " Throughput " + (pollCount.get() / testTime));
                System.out.println("Empty polled " + emptyPollCount.get());
                System.out.println("Offer failed " + offerFailCount.get());
                System.out.println("Remaining by size() " + queue.size());
                System.out.println("Offer poll difference " + (offerCount.get() - pollCount.get()));
                
      //          int remaining = 0;
      //          while(queue.poll() != null)
      //               remaining += 1;
      //          System.out.println("Remaining by poll " + remaining);     // just to verify
           }
           
           
           class Producer extends Thread
           {
                public void run()
                {
                     boolean success;
                     while(System.currentTimeMillis() < endTime)
                     {
                          success = queue.offer(objectToOffer);
                          if(success)
                               offerCount.incrementAndGet();
                          else
                               offerFailCount.incrementAndGet();
                     }
                }
           }
           class Consumer extends Thread
           {
                public void run()
                {
                     Integer value;
                     while(System.currentTimeMillis() < endTime)
                     {
                          value = queue.poll();
                          if(value != null)
                               pollCount.incrementAndGet();
                          else
                               emptyPollCount.incrementAndGet();
                     }
                }
           }
      }
      Thanks for reading so far and also thanks in advance for your answers
      
      Niteen                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    
        • 1. Re: Facing an issue while coding for thread-safe class
          EJP
          I can't see anything thread-safe about this at all beyond the use of AtomicIntegers. That's not sufficient.
          • 2. Re: Facing an issue while coding for thread-safe class
            921017
            Hi,

            What I was thinking that when multiple threads try to offer objects at same time, because of atomic operation tailIndex.incrementAndGet(), each will secure unique place in the array.
            and in case node is full, because of atomic operation nextNode.compareAndSet() one thread will attach new node to existing node, and then all threads will use the new node.

            Also when multiple threads try to poll the objects at same time, they will secure one object each by atomic operation remaining.decrementAndGet() and then access one of the secured object using headIndex.incrementAndGet()

            Am I missing out something here? Is this not sufficient to make a concurrent queue. Java's ConcurrentLinkedQueue also uses atomic operation to update internal references. That class is also not using any 'synchronized' thing or any lock.

            Please suggest where I am going wrong.

            Thanks and Regards

            Niteen
            • 3. Re: Facing an issue while coding for thread-safe class
              EJP
              No it isn't sufficient. Those atomic things are only atomic while you're in them. If you use two of them in an expression, the expression doesn't magically become atomic.

              Same reasoning that says you have to synchronize on a Vector yourself while iterating over it, even though all Vector's methods are synchronized.