This discussion is archived
4 Replies Latest reply: Jan 23, 2013 7:10 PM by eno g. - oracle RSS

Creating multiple parallel tasks by a single service

eno g. - oracle Newbie
Currently Being Moderated
Hello,

The doc for Service clearly illustrates how to create a task to obtain the first line of a URL and returning it as a String (see below). What would one do if they were given an unknown number of urls to get that first line from each of them in parallel?

Lets suppose I have a class extending Task that would read that first line. Should I just create a Task inside the Service that loops through the given URLs and instantiates myTask for each of them?

Thanks,
Eno
public static class FirstLineService extends Service<String> {
   private StringProperty url = new SimpleStringProperty(this, "url");
   public final void setUrl(String value) { url.set(value); }
   public final String getUrl() { return url.get(); }
   public final StringProperty urlProperty() { return url; }

   protected Task createTask() {
       final String _url = getUrl();
       return new Task<String>() {
           protected String call() throws Exception {
                URL u = new URL(_url);
                BufferedReader in = new BufferedReader(
                        new InputStreamReader(u.openStream()));
                String result = in.readLine();
                in.close();
                return result;
           }
      };
   }
}
Edited by: Eno G. on Jan 23, 2013 4:43 PM

Edited by: Eno G. on Jan 23, 2013 4:45 PM
  • 1. Re: Creating multiple parallel tasks by a single service
    RichardBair Journeyer
    Currently Being Moderated
    As it turns out I'm working on [RT-18702|http://javafx-jira.kenai.com/browse/RT-18702?focusedCommentId=326742#comment-326742] at this moment. It might be related to what you want (or maybe not).

    In your case, I think what I'd do is in Task's call method, I'd spawn a bunch of threads to go handle the input in parallel, and block from returning from the call until all the various backgrounds threads have completed (maybe using a count down latch).
  • 2. Re: Creating multiple parallel tasks by a single service
    eno g. - oracle Newbie
    Currently Being Moderated
    Yep, looks like #3 is what I'm looking for... but I'm not sure I'd know how to implement your suggestion (novice at best :) ). I can play around with it a bit.

    Thanks
  • 3. Re: Creating multiple parallel tasks by a single service
    jsmith Guru
    Currently Being Moderated
    You could just create multiple Services.

    The code below create a couple of custom executors and demonstrates them executing groups of services in parallel or sequentially.
    An artificial sleep is added to the service body so that the service does not complete too quickly and you can clearly see what is going on.
    import java.io.*;
    import java.net.URL;
    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicInteger;
    import javafx.application.Application;
    import javafx.beans.property.*;
    import javafx.concurrent.*;
    import javafx.event.EventHandler;
    import javafx.scene.*;
    import javafx.scene.control.*;
    import javafx.scene.layout.*;
    import javafx.stage.Stage;
    
    public class FirstLineSequentialVsParallelService extends Application {
      private static final String[] URLs = {
        "http://www.google.com", 
        "http://www.yahoo.com", 
        "http://www.microsoft.com", 
        "http://www.oracle.com" 
      };
    
      private ExecutorService sequentialFirstLineExecutor;
      private ExecutorService parallelFirstLineExecutor;
    
      @Override public void init() throws Exception {
        sequentialFirstLineExecutor = Executors.newFixedThreadPool(
          1, 
          new FirstLineThreadFactory("sequential")
        );  
    
        parallelFirstLineExecutor = Executors.newFixedThreadPool(
          URLs.length, 
          new FirstLineThreadFactory("parallel")
        );  
      }
    
      @Override
      public void stop() throws Exception {
        parallelFirstLineExecutor.shutdown();
        parallelFirstLineExecutor.awaitTermination(3, TimeUnit.SECONDS);
    
        sequentialFirstLineExecutor.shutdown();
        sequentialFirstLineExecutor.awaitTermination(3, TimeUnit.SECONDS);
      }
      
      public static void main(String[] args) { launch(args); }
      @Override public void start(Stage stage) throws Exception {
        final VBox messages = new VBox();
        messages.setStyle("-fx-background-color: cornsilk; -fx-padding: 10;");
    
        messages.getChildren().addAll(
          new Label("Parallel Execution"), 
          new Label("------------------"), 
          new Label()
        );
        fetchFirstLines(messages, parallelFirstLineExecutor);
    
        messages.getChildren().addAll(
          new Label("Sequential Execution"), 
          new Label("--------------------"), 
          new Label()
        );
        fetchFirstLines(messages, sequentialFirstLineExecutor);
        
        messages.setStyle("-fx-font-family: monospace");
    
        stage.setScene(new Scene(messages, 600, 800));
        stage.show();
      }
    
      private void fetchFirstLines(final VBox monitoredLabels, ExecutorService executorService) {
        for (final String url: URLs) {
          final FirstLineService service = new FirstLineService();
          service.setExecutor(executorService);
          service.setUrl(url);
    
          final ProgressMonitoredLabel monitoredLabel = new ProgressMonitoredLabel(url);
          monitoredLabels.getChildren().add(monitoredLabel);
          monitoredLabel.progress.progressProperty().bind(service.progressProperty());
          
          service.setOnSucceeded(new EventHandler<WorkerStateEvent>() {
            @Override public void handle(WorkerStateEvent t) {
              monitoredLabel.addStrings(
                service.getMessage(),
                service.getValue()
              );
            }
          });
          service.start();
        }
      }
      
      public class ProgressMonitoredLabel extends HBox {
        final ProgressBar progress;
        final VBox labels;
                
        public ProgressMonitoredLabel(String initialString) {
          super(20);
          
          progress = new ProgressBar();
          labels   = new VBox();
          labels.getChildren().addAll(new Label(initialString), new Label());
          
          progress.setPrefWidth(100);
          progress.setMinWidth(ProgressBar.USE_PREF_SIZE);
          HBox.setHgrow(labels, Priority.ALWAYS);
          setMinHeight(80);
          
          getChildren().addAll(progress, labels);
        }
        
        public void addStrings(String... strings) {
          for (String string: strings) {
            labels.getChildren().add(
              labels.getChildren().size() - 1, 
              new Label(string)
            );
          }
        }
      }
      
      public static class FirstLineService extends Service<String> {
        private StringProperty url = new SimpleStringProperty(this, "url");
        public final void setUrl(String value) { url.set(value); }
        public final String getUrl() { return url.get(); }
        public final StringProperty urlProperty() { return url; }
        protected Task createTask() {
          final String _url = getUrl();
          return new Task<String>() {
            protected String call() throws Exception {
              updateMessage("Called on thread: " + Thread.currentThread().getName());
              URL u = new URL(_url);
              BufferedReader in = new BufferedReader(
                      new InputStreamReader(u.openStream()));
              String result = in.readLine();
              in.close();
              
              // pause just so that it really takes some time to run the task 
              // so that parallel execution behaviour can be observed.
              for (int i = 0; i < 100; i++) {
                updateProgress(i, 100);
                Thread.sleep(50); 
              }
              
              return result;
            }
         };
        }
      }
      
      static class FirstLineThreadFactory implements ThreadFactory {
        static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final String type;
        
        public FirstLineThreadFactory(String type) {
          this.type = type;
        }
                
        @Override public Thread newThread(Runnable runnable) {
          Thread thread = new Thread(runnable, "LineService-" + poolNumber.getAndIncrement() + "-thread-" + type);
          thread.setDaemon(true);
    
          return thread;
        }
      }  
    }
    Actually for the code above, you could just use Tasks instead of services (because every service is only executed once which is all a task is), but I guess, if you wanted, you could place the services in a pool (which you would need to create) and execute or reuse them or, kind of along the lines of what Richard is suggesting - create a single service which pools together the executed multiple tasks it can run concurrently. There are a couple of nice additional things you get from Richard's approach - for example, an aggregated view of the progress of the concurrent services or a notification that all services have completed, etc. But you may not need those things and something that you adapt from the sample code in this answer might suffice for your case.
  • 4. Re: Creating multiple parallel tasks by a single service
    eno g. - oracle Newbie
    Currently Being Moderated
    I do have them as Tasks right now. I'm using them to open files of various sizes and I would like to have a progress window that would show the progress/status of each file being loaded (and possibly cancel that process). Since I could open files at different times it made sense that I'd use a Service to show the progress window with the progress of each task. To me, it sounds exactly like the third use case that Richard has in the FR he's working on.

    I guess at this point I need to figure out a way to achieve this with what's available.

    Thanks

Legend

  • Correct Answers - 10 points
  • Helpful Answers - 5 points