0 Replies Latest reply: May 25, 2013 2:40 PM by 1010975 RSS

    Cancelling a Future with interrupt causes Future.get to unblock prematurely

    1010975
      I consider myself a seasoned Java developer with a good grasp of concurrency. Still, I am surprised at new and unexpected behavior, such as this one:

      If you cancel a Future with interrupt, the Future.get may unblock and return before the task block has had a chance to complete. This violates the happens-before relationship of Future.get: I expect Future.get() not to return until the whole body of the corresponding task has finished running. But it returns too early?!

      I am inclined to consider this behavior a bug, because it breaks the happens-before relation of Future.get, but maybe there is a reasonable explanation?

      Because code says more than words, I've written a JUnit test to illustrate the problem and a FixedExecutorService decorator which 'fixes' the behavior.

      -----
      package concurrent;

      import static java.util.concurrent.Executors.*;
      import static java.util.concurrent.TimeUnit.*;
      import static org.junit.Assert.*;

      import java.util.Arrays;
      import java.util.List;
      import java.util.concurrent.CancellationException;
      import java.util.concurrent.CountDownLatch;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Future;
      import java.util.concurrent.TimeoutException;
      import java.util.concurrent.atomic.AtomicReference;

      import org.junit.After;
      import org.junit.Test;
      import org.junit.runner.RunWith;
      import org.junit.runners.Parameterized;
      import org.junit.runners.Parameterized.Parameters;

      @RunWith(Parameterized.class)
      public class ExecutorServiceTest {

           private final ExecutorService executorService;

           public ExecutorServiceTest(ExecutorService executorService) {
                this.executorService = executorService;
           }

           @After
           public void tearDown() throws Throwable {
                executorService.shutdownNow();
                executorService.awaitTermination(1000L, DAYS);
           }

           @Parameters
           public static List<Object[]> parameters() {
                // Note that ForkJoinPool is excluded from the set of test parameters because it never interrupts threads on
                // cancel, not even when mayInterruptIfRunning is true.
                // This test is specifically targets cancel with interrupt, so there is no point in trying to run it on a
                // ForkJoinPool. The test would just time out.
                return Arrays.asList(
                          // Show how default ExecutorServices fail the test.
                          new Object[] {
                               newFixedThreadPool(2)
                          },
                          new Object[] {
                               newCachedThreadPool()
                          },
                          new Object[] {
                               newSingleThreadExecutor()
                          },
                          // Shows how the FixedExecutorService decorator fixes the behavior.
                          new Object[] {
                               new FixedExecutorService(newFixedThreadPool(2))
                          },
                          new Object[] {
                               new FixedExecutorService(newCachedThreadPool())
                          },
                          new Object[] {
                               new FixedExecutorService(newSingleThreadExecutor())
                          });
           }

           @Test(timeout = 5000L)
           public void futureBlocksUntilTaskDoneOnCancelWithInterrupt() throws Throwable {
                final CountDownLatch operationOnResourceStarted = new CountDownLatch(1);
                final CountDownLatch interruptibleOperationOnResource = new CountDownLatch(1);
                final CountDownLatch closeResource = new CountDownLatch(1);
                final AtomicReference<Exception> firstFailure = new AtomicReference<>();
                Runnable task = new Runnable() {

                     @Override
                     public void run() {
                          operationOnResourceStarted.countDown();
                          try {
                               interruptibleOperationOnResource.await();
                          } catch (InterruptedException expected) {
                          } catch (Exception failure) {
                               firstFailure.compareAndSet(null, failure);
                          } finally {
                               try {
                                    closeResource.await();
                               } catch (InterruptedException failure) {
                                    firstFailure.compareAndSet(null, failure);
                               }
                          }
                     }
                };
                Future<?> future = executorService.submit(task);
                // Wait until the task has actually started using the device before cancelling it.
                operationOnResourceStarted.await();
                // Cancel with interrupt.
                future.cancel(true);
                try {
                     future.get(50L, MILLISECONDS);
                     fail("Expected " + TimeoutException.class.getName());
                } catch (TimeoutException expected) {
                } catch (CancellationException unexpected) {
                     fail("It's too early for " + CancellationException.class.getName()
                               + "! The resource is still being closed!");
                } finally {
                     // Unblock the ExecutorService's thread, so that tearDown can properly close it.
                     closeResource.countDown();
                }
                try {
                     future.get(50L, MILLISECONDS);
                     fail("Expected " + CancellationException.class.getName());
                } catch (CancellationException expected) {
                }
                // Fail the test if there was a failure on the asynchronous task thread.
                if (firstFailure.get() != null) {
                     throw firstFailure.get();
                }
           }
      }
      -----
      package concurrent;

      import java.util.ArrayList;
      import java.util.Collection;
      import java.util.Iterator;
      import java.util.List;
      import java.util.concurrent.Callable;
      import java.util.concurrent.CountDownLatch;
      import java.util.concurrent.ExecutionException;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Future;
      import java.util.concurrent.TimeUnit;
      import java.util.concurrent.TimeoutException;

      public class FixedExecutorService implements ExecutorService {

           private final ExecutorService delegate;

           public FixedExecutorService(ExecutorService delegate) {
                this.delegate = delegate;
           }

           @Override
           public void execute(Runnable command) {
                delegate.execute(command);
           }

           @Override
           public void shutdown() {
                delegate.shutdown();
           }

           @Override
           public List<Runnable> shutdownNow() {
                return delegate.shutdownNow();
           }

           @Override
           public boolean isShutdown() {
                return delegate.isShutdown();
           }

           @Override
           public boolean isTerminated() {
                return delegate.isTerminated();
           }

           @Override
           public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
                return delegate.awaitTermination(timeout, unit);
           }

           @Override
           public <T> Future<T> submit(Callable<T> task) {
                FixedCallable<T> fixedTask = fix(task);
                Future<T> future = delegate.submit(fixedTask);
                return fixedTask.fix(future);
           }

           @Override
           public <T> Future<T> submit(Runnable task, T result) {
                FixedRunnable fixedTask = fix(task);
                Future<T> future = delegate.submit(fixedTask, result);
                return fixedTask.fix(future);
           }

           @Override
           public Future<?> submit(Runnable task) {
                FixedRunnable fixedTask = fix(task);
                Future<?> future = delegate.submit(fixedTask);
                return fixedTask.fix(future);
           }

           private FixedRunnable fix(Runnable task) {
                return new FixedRunnable(task);
           }

           private <V> FixedCallable<V> fix(Callable<V> task) {
                return new FixedCallable<>(task);
           }

           @Override
           public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
                Collection<FixedCallable<T>> fixedTasks = fix(tasks);
                List<Future<T>> futures = delegate.invokeAll(fixedTasks);
                return fix(fixedTasks, futures);
           }

           private <T> List<Future<T>> fix(Collection<FixedCallable<T>> fixedTasks, List<Future<T>> futures) {
                List<Future<T>> fixedFutures = new ArrayList<>(futures.size());
                Iterator<FixedCallable<T>> it = fixedTasks.iterator();
                for (Future<T> future: futures) {
                     fixedFutures.add(it.next().fix(future));
                }
                return fixedFutures;
           }

           private <T> Collection<FixedCallable<T>> fix(Collection<? extends Callable<T>> tasks) {
                Collection<FixedCallable<T>> fixedTasks = new ArrayList<>(tasks.size());
                for (Callable<T> task: tasks) {
                     fixedTasks.add(fix(task));
                }
                return fixedTasks;
           }

           @Override
           public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
                     throws InterruptedException {
                Collection<FixedCallable<T>> fixedTasks = fix(tasks);
                List<Future<T>> futures = delegate.invokeAll(fixedTasks, timeout, unit);
                return fix(fixedTasks, futures);
           }

           @Override
           public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
                return delegate.invokeAny(tasks);
           }

           @Override
           public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
                     throws InterruptedException, ExecutionException, TimeoutException {
                return delegate.invokeAny(tasks, timeout, unit);
           }

           private static class FixedRunnable implements Runnable {

                private final CountDownLatch barrier = new CountDownLatch(1);

                private final Runnable delegate;

                public FixedRunnable(Runnable delegate) {
                     this.delegate = delegate;
                }

                @Override
                public void run() {
                     try {
                          delegate.run();
                     } finally {
                          barrier.countDown();
                     }
                }

                private <R> Future<R> fix(Future<R> future) {
                     return new FixedFuture<>(future, barrier);
                }
           }

           private static class FixedCallable<V> implements Callable<V> {

                private final Callable<V> delegate;

                private final CountDownLatch barrier = new CountDownLatch(1);

                public FixedCallable(Callable<V> delegate) {
                     this.delegate = delegate;
                }

                @Override
                public V call() throws Exception {
                     return delegate.call();
                }

                private <R> Future<R> fix(Future<R> future) {
                     return new FixedFuture<>(future, barrier);
                }
           }

           private static class FixedFuture<R> implements Future<R> {

                private final Future<R> delegate;

                private final CountDownLatch barrier;

                public FixedFuture(Future<R> delegate, CountDownLatch barrier) {
                     this.delegate = delegate;
                     this.barrier = barrier;
                }

                @Override
                public boolean cancel(boolean mayInterruptIfRunning) {
                     return delegate.cancel(mayInterruptIfRunning);
                }

                @Override
                public boolean isCancelled() {
                     return delegate.isCancelled();
                }

                @Override
                public boolean isDone() {
                     return delegate.isDone();
                }

                @Override
                public R get() throws InterruptedException, ExecutionException {
                     barrier.await();
                     return delegate.get();
                }

                @Override
                public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                     if (barrier.await(timeout, unit)) {
                          return delegate.get(timeout, unit);
                     } else {
                          throw new TimeoutException();
                     }
                }
           }
      }
      -----