12 Replies Latest reply: Apr 28, 2008 5:14 PM by 807591 RSS

    ExecutorService and waiting for threads to complete.

    807591
      Hi everyone,
      I'm trying to use threads to run regular expressions on a log file(which is millions of lines long) which I need to process real quick and I came up with this program to use a thread pool to do the job, however, I'm aware that I need to check if all threads have finished executing before proceeding further.
      I just used a shutdown() method call from the ExecutorService class, because from the documentation:
      http://java.sun.com/javase/6/docs/api/java/util/concurrent/ExecutorService.html#shutdown()

      ie, a shutdown() call doesnt take any further requests but makes sure all the tasks submitted run to completion, so my question is, do I need to do anymore to check that all the threads have run to completion?
      Here's my program below:
      import java.io.FileNotFoundException;
      import java.util.ArrayList;
      import java.util.Collections;
      import java.util.Comparator;
      import java.util.Map;
      import java.util.Scanner;
      import java.util.concurrent.ConcurrentHashMap;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      import java.util.regex.Matcher;
      import java.util.regex.Pattern;
      
      public class ExecutorServiceTest
      {
           public static void main(String[] args) throws FileNotFoundException
           {
                Scanner in = new Scanner(new java.io.FileReader("C:/workspace/Test Programs/files/o10k.ap"));
                final Map<String, Integer> counts = new ConcurrentHashMap<String, Integer>();
                final Pattern pattern = java.util.regex.Pattern.compile("GET /ongoing/When/\\d\\d\\dx/(\\d\\d\\d\\d/\\d\\d/\\d\\d/[^ .]+) ");
                ExecutorService service = Executors.newCachedThreadPool();
                
                while (in.hasNextLine())
                {
                     final String nextline = in.nextLine();
      
                     service.execute(new Runnable()
                     {
                          public void run()
                          {
                               Matcher matcher = pattern.matcher(nextline);
                               if (matcher.find())
                               {
                                    String str = matcher.group(1);
                                    if (counts.containsKey(str))
                                         counts.put(str, counts.get(str) + 1);
                                    else
                                         counts.put(str, 1);
                               }                         
                          }
                     });               
                }
                
                service.shutdown();
                ArrayList<String> keys = new ArrayList<String>(counts.keySet());
                Collections.sort(keys, new Comparator<String>()
                          {
                     public int compare(String x, String y)
                     {
                          return counts.get(y) - counts.get(x);
                     }
                          });
                for (int i = 0; i < 10; i++)
                     System.out.println(counts.get(keys.get(i)) + ": " + keys.get(i));
           }
      }
        • 1. Re: ExecutorService and waiting for threads to complete.
          807591
          don't know if this will help, but you can store references to your threads in a data structure (such as ArrayList..or whatever) and at the line where you have:
          service.shutdown();
          you can iterate through your structure and start calling the join() method on each thread in that structure. This way you are guaranteed not to go any further without having all the threads completing their run() methods.

          Hope that helps.
          • 2. Re: ExecutorService and waiting for threads to complete.
            807591
            umm, not sure if join() is the right way to go when ExecutorService offers methods which can do similar tasks? I'm looking at another code example from "java concurrency in practice" and this is what they have:
            public <T> Collection<T> getParallelResults(List<Node<T>> nodes) throws InterruptedException 
            {
                    ExecutorService exec = Executors.newCachedThreadPool();
                    Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
                    parallelRecursive(exec, nodes, resultQueue);
                    *exec.shutdown();*
                    *exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);*
                    return resultQueue;
                }
            What they seem to be doing extra from my code is an additional call to awaitTermination(), that is, wait for all the tasks to finish executing for some time after calling shutdown() on them, seems to be right?
            • 3. Re: ExecutorService and waiting for threads to complete.
              807591
              the join function is to ensure each task completes before the next task is called. is this not what you need?
              • 4. Re: ExecutorService and waiting for threads to complete.
                807591
                No, all I want to ensure is that all the tasks which I handed over to the execute() method have finished before printing the contents of the Map.
                • 5. Re: ExecutorService and waiting for threads to complete.
                  807591
                  After you call shutdown, call awaitTermination. That's all you need.

                  If you want to reuse the executor instead of shutting it down, then use invokeAll() or submit(). Then call get() on all of your futures.
                  • 6. Re: ExecutorService and waiting for threads to complete.
                    807591
                    James, thanks, thats what I feel too.
                    • 7. Re: ExecutorService and waiting for threads to complete.
                      807591
                      This is what I have now, and I think I cant go wrong with this version?
                      import java.util.ArrayList;
                      import java.util.Collections;
                      import java.util.Comparator;
                      import java.util.Map;
                      import java.util.Scanner;
                      import java.util.concurrent.ConcurrentHashMap;
                      import java.util.concurrent.ExecutorService;
                      import java.util.concurrent.Executors;
                      import java.util.concurrent.TimeUnit;
                      import java.util.regex.Matcher;
                      import java.util.regex.Pattern;
                      
                      public class ExecutorServiceTest
                      {
                           public static void main(String[] args) throws Exception
                           {
                                Scanner in = new Scanner(new java.io.FileReader("C:/workspace/Test Programs/files/o10k.ap"));
                                final Map<String, Integer> counts = new ConcurrentHashMap<String, Integer>();
                                final Pattern pattern = java.util.regex.Pattern.compile("GET /ongoing/When/\\d\\d\\dx/(\\d\\d\\d\\d/\\d\\d/\\d\\d/[^ .]+) ");
                                ExecutorService service = Executors.newCachedThreadPool();
                      
                                while (in.hasNextLine())
                                {
                                     final String nextline = in.nextLine();
                      
                                     service.execute(new Runnable()
                                     {
                                          public void run()
                                          {
                                               Matcher matcher = pattern.matcher(nextline);
                                               if (matcher.find())
                                               {
                                                    String str = matcher.group(1);
                                                    if (counts.containsKey(str))
                                                         counts.put(str, counts.get(str) + 1);
                                                    else
                                                         counts.put(str, 1);
                                               }                         
                                          }
                                     });               
                                }
                      
                                //Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.
                                service.shutdown();
                                
                                //Blocks until all tasks have completed execution after a shutdown request.
                                service.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
                                
                                //If all tasks have completed following shut down.
                                if(service.isTerminated())
                                {
                                     ArrayList<String> keys = new ArrayList<String>(counts.keySet());
                                     Collections.sort(keys, new Comparator<String>()
                                               {
                                          public int compare(String x, String y)
                                          {
                                               return counts.get(y) - counts.get(x);
                                          }
                                               });
                                     for (int i = 0; i < 10; i++)
                                          System.out.println(counts.get(keys.get(i)) + ": " + keys.get(i));
                      
                                }
                      
                           }
                      }
                      • 8. Re: ExecutorService and waiting for threads to complete.
                        807591
                        Yes, your executor shutdown is correct. I think this might not be threadsafe:
                        if (counts.containsKey(str))
                             counts.put(str, counts.get(str) + 1);
                        else
                             counts.put(str, 1);
                        Shouldn't that have a synchronzied(counts) {...} around it?
                        • 9. Re: ExecutorService and waiting for threads to complete.
                          807591
                          counts is a ConcurrentHashMap, so I am thinking its thread safe?
                          • 10. Re: ExecutorService and waiting for threads to complete.
                            807591
                            Each method on a ConcurrentHashMap is atomic, but you have two separate method calls. So after your containsKey, something else could execute before your put(...). Get rid of the ConcurrentHashMap and synchronize all uses of it in your Runnable.
                            • 11. Re: ExecutorService and waiting for threads to complete.
                              807591
                              what about this?
                              import java.util.ArrayList;
                              import java.util.Collections;
                              import java.util.Comparator;
                              import java.util.Map;
                              import java.util.Scanner;
                              import java.util.concurrent.ConcurrentHashMap;
                              import java.util.concurrent.ExecutorService;
                              import java.util.concurrent.Executors;
                              import java.util.concurrent.TimeUnit;
                              import java.util.regex.Matcher;
                              import java.util.regex.Pattern;
                              
                              public class ExecutorServiceTest
                              {
                                   public static void main(String[] args) throws Exception
                                   {
                                        Scanner in = new Scanner(new java.io.FileReader("C:/workspace/Test Programs/files/o10k.ap"));
                                        final Map<String, Integer> counts = new ConcurrentHashMap<String, Integer>();
                                        final Pattern pattern = java.util.regex.Pattern.compile("GET /ongoing/When/\\d\\d\\dx/(\\d\\d\\d\\d/\\d\\d/\\d\\d/[^ .]+) ");
                                        ExecutorService service = Executors.newCachedThreadPool();
                              
                                        while (in.hasNextLine())
                                        {
                                             final String nextline = in.nextLine();
                              
                                             service.execute(new Runnable()
                                             {
                                                  public void run()
                                                  {
                                                       Matcher matcher = pattern.matcher(nextline);
                                                       if (matcher.find())
                                                       {
                                                            String str = matcher.group(1);
                                                            synchronized (counts) 
                                                            {
                                                                 if (counts.containsKey(str))
                                                                      counts.put(str, counts.get(str) + 1);
                                                                 else
                                                                      counts.put(str, 1);
                                                            }
                                                       }                         
                                                  }
                                             });               
                                        }
                              
                                        //Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.
                                        service.shutdown();
                              
                                        //Blocks until all tasks have completed execution after a shutdown request.
                                        service.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
                              
                                        //If all tasks have completed following shut down.
                                        if(service.isTerminated())
                                        {
                                             ArrayList<String> keys = new ArrayList<String>(counts.keySet());
                                             Collections.sort(keys, new Comparator<String>()
                                                       {
                                                  public int compare(String x, String y)
                                                  {
                                                       return counts.get(y) - counts.get(x);
                                                  }
                                                       });
                              
                                             for (int i = 0; i < 10; i++)
                                                  System.out.println(counts.get(keys.get(i)) + ": " + keys.get(i));
                              
                                        }
                              
                                   }
                              }
                              • 12. Re: ExecutorService and waiting for threads to complete.
                                807591
                                Yeah, that'll work. I'd still replace the ConcurrentHashMap with a regular HashMap since it doesn't buy you anything at this point.