Java Multithreading Steeplechase: Cancelling Tasks In Executors

In the previous post on stopping threads, we explored thread design strategies to safely stop threads in Java. In this post, let’s look at various ways we can stop or cancel tasks handled by Executors (and ExecutorServices).

Standard Cancellation:

When we submit a task (Callable or its older cousin Runnable) for execution to an Executor or ExecutorService (e.g. ThreadPoolExecutor), we get a Future back which wraps the tasks. It has a method called cancel(…) which can be called to cancel the taskFuture wraps. Calling this method has different effects depending on the stage the task is in. A task could be in three possible stages after being submitted to an Executor:

  1. The task hasn’t started executing yet – it is waiting in the work queue for a thread to start executing it.
  2. A thread is executing the task.
  3. The task has finished executing.

Cancellation is trivial, if the task hasn’t started executed. It is simply removed from the work queue. Similarly, if the task has finished executing, cancelling it has no effect.

It is a little tricky when the task is executing in a thread. Recall from my previous post: to stop threads in Java, we rely on a co-operative mechanism called Interruption. The idea is very simple. To stop a thread, all we can do is deliver it a signal, aka interrupt it, requesting that the thread stops itself at the next available opportunity. If the thread cooperates, it will clean up itself and stop. Non-cooperating threads ignore the request and cancellation will have no effect.

From Javadocs:

boolean cancel(boolean mayInterruptIfRunning)
Attempts to cancel execution of this task. This attempt will fail if the task has already completed, has already been cancelled, or could not be cancelled for some other reason. If successful, and this task has not started when cancel is called, this task should never run. If the task has already started, then the mayInterruptIfRunning parameter determines whether the thread executing this task should be interrupted in an attempt to stop the task.

So when the tasks is already executing and we call cancel(true) on it, it will deliver an interrupt signal to the thread executing the task. In order to make this work properly, your threads must be designed to handle interruption. Refer to this post for more info.

Non-Standard Cancellation:

Sometime, it becomes necessary to support non-standard task cancellation – especially when the task relies on blocking or long-running methods that are oblivious to interruption. E.g. when you call ServerSocket.accept(), it starts waiting for a client connection. The catch-22 is that it will ignore all interruption requests and if this function is called in a thread, you cannot stop that thread using interrupts. To support nonstandard cancellation where interrupts won’t work, there are two ways of doing it. Please remember, in both of the following ways, you will have to do something to cancel the method that ignores interrupts. E.g. in the case of ServerSocket, closes the underlying socket which forces the accept() method to throw an exception.

1. Overriding Thread.interrupt():

Provide a custom ThreadFactory to the ExecutorService. Return custom Threads which override the interrupt method. For example:

Overriding interrupt() method is not recommended.

2. Overriding Future.cancel(…):

In your tasks (e.g. Callable) provide a method for non-standard cancellation, such as cancelTask(). Then override the Future.cancel(…) to call cancelTask().

But think about it: We do not normally create a Future ourselves and specify what the cancel(…) method does: we get Future when we submit a task to an ExecutorService via ExecutorService.submit(…).

Luckily, ExecutorService calls on a method called newTaskFor(Callable c) that returns the Future (or rather RunnableFuture) representing the task. Hence we need to override newTaskFor(…) to return a custom Future which overrides the cancel(…) method. This is shown below with an example:

Whereas the IdentifiableCallable is shown below:

Next we need to define our own FutureWrapper so we can override the cancel(…) method:

Now we need to define our task as follows:

That’s all. Now when we call FutureTaskWrapper.cancel(…), it will in turn call cancelTask(), where we can do our non-standard cancellation.

The entire code used in this post is available on GitHub.

Java Multithreading Steeplechase: Executors

Historical Perspective on Tasks & Threads

Tasks are activities that perform some action or do calculations. For example a task could calculate prime numbers up to some upper limit. Good tasks do not depend on other tasks: they are independent. In this post, when I refer to tasks, I would mean tasks that are independent.

Tasks in Java can be represented by a very simple interface called Runnable that has only one method: run(). The singular function neither returns a value nor can throw checked exceptions.

public interface Runnable {
    void run();
}

Many new comers to Java presume Threads to be the primary abstraction for running tasks. This means that a task can be submitted to a thread which then runs the task. In fact, the Thread class has constructors which take a Runnable for execution:

Thread(Runnable target)
Thread(Runnable target, String name)
Thread(ThreadGroup group, Runnable target)
...

There are obvious benefits in segregating tasks and threads.

A Task, defined by implementing Runnables, is submitted to Thread for execution. The Thread doesn’t know anything about the task and the same thread could run several different tasks.

Enter Executor:

Executor was introduced in Java 1.5 as a clean abstraction for executing tasks. Mantle was passed to Executor from Thread. According to the Java API, an Executor:

“… executes submitted Runnable tasks.  This interface provides a way of decoupling task submission from the mechanics of how each task will be run, including details of thread use, scheduling, etc.” In essence, Executor is an interface, whose simplicity rivals that of Runnable:

public interface Executor {
    void execute(Runnable command);
}

The ‘very simple’ Executor interface forms basis for a very powerful asynchronous task execution framework. It is based on a Producer-Consumer pattern: Producers produce tasks and Consumer threads execute these tasks.

ExecutorService

There is little chance you will use ever use Executor directly. It is very powerful, yet feature starved interface with a lone method for executing tasks. Fortunately for us, it has a famous child called ExecutorService, which provides lifecycle support such as shutdown, task tracking and the ability to retrieve results.

Tracking Task Progress via Future

ExecutorService defines a method called `submit(Runnable task)` which returns a `Future` that can be used to track task’s progress and cancel it (if desired). Future is an interface. From its javadocs:

“A Future represents the result of an asynchronous computation. Methods are provided to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation. The result can only be retrieved using method get when the computation has completed, blocking if necessary until it is ready. Cancellation is performed by the cancel method. Additional methods are provided to determine if the task completed normally or was cancelled. Once a computation has completed, the computation cannot be cancelled.”

RunnableFuture

Earlier on, I said that the interface Runnable doesn’t return a value. Runnable tasks can indicate completion by modifying a shared data structure. RunnableFuture implements both Future and Runnable interfaces. It can be submitted to any method which expects Runnable and the Future allows to access its result.

So far we have only discussed interfaces (Executor, ExecutorService and Future). Before we look into concrete classes, let us consider one very important concept.

Thread Pool

A  design pattern: http://en.wikipedia.org/wiki/Thread_pool_pattern. It has a task queue which holds incoming tasks and has a pool of thread which takes tasks from the queue and execute them.

thread pool

A sample thread pool (green boxes) with waiting tasks (blue) and completed tasks (yellow)

Benefits of Thread Pools are thread re-use (creating new threads is a significant CPU overhead) and improved responsiveness (there may already be a waiting thread when a task arrives).

Now let us discuss concrete classes.

AbstractExecutorService

This is a skeletal implementation for ExecutorService, providing default implementations for some of it’s methods.

public abstract class AbstractExecutorService
implements ExecutorService

ThreadPoolExecutor

This is an ExecutorService that applies the Thread Pool pattern to execute tasks. From its javadocs:

“An ExecutorService that executes each submitted task using one of possibly several pooled threads, normally configured using Executors factory methods.” It provides several methods for setting pool and task queue sizes. For more information:

public class ThreadPoolExecutor extends AbstractExecutorService
implements Executor, ExecutorService

FutureTask

Provides an implementation of Future and RunnableFuture. From its javadoc:

“…provides a base implementation of Future, with methods to start and cancel a computation, query to see if the computation is complete, and retrieve the result of the computation.”

Since a FutureTask implements RunnableFuture, you can submit it directly to an ExecutorService.

Callable:

Callable‘s were introduced in Java 5 as the next version of Runnable. Just like Thread passed mantle to Executor for task execution, Runnable passed mantle to Callable for representing tasks.

Callable for used to represent tasks. Unlike Runnable’s, they can return a value and even throw Exceptions. They even support generics.

Summary:

Executor and ExecutorService form a very powerful framework for asynchronous task execution. Future is a wrapper that provides a way to track a task’s progress and could be used to cancel it. Callable represents a task and allows the task to return a value and throw exceptions.

So you might ask why do we still have Threads and Runnables if we have better choices available, in the form of Executor and Callable. As far as Callable Vs Runnable is concerned, the reason is purely backwards compatibility. Threads are not languishing in Java. ExecutorService simply provides a cleaner abstraction for executing tasks. They still rely on Threads to execute these tasks.

Java Multithreading Steeplechase: Stopping Threads

Let us cut to the chase: In Java, there is no way to quickly and reliably stop a thread.

Java language designers got drunk once and attempted to support forced thread termination by releasing the following methods: `Thread.stop()`, `Thread.suspend()` and `Thread.resume()`. However, when they become sober, they quickly realized their mistake and deprecated them. Abrupt thread termination is not so straight forward. A running thread, often called by many writers as a light-weight process, has its own stack and is the master of its own destiny (well daemons are). It may own files and sockets. It may hold locks. Termination is not always easy: Unpredictable consequences may arise if the thread is in the middle of writing to a file and is killed before it can finish writing. Or what about the monitor locks held by the thread when it is shot in the head? For more information on why `Thread.stop()` was deprecated, follow this link: http://docs.oracle.com/javase/6/docs/technotes/guides/concurrency/threadPrimitiveDeprecation.html

Anyways, back to the point.

In Java, there is no way to quickly and reliably stop a thread.

To stop threads in Java, we rely on a co-operative mechanism called Interruption. The concept is very simple. To stop a thread, all we can do is deliver it a signal, aka interrupt it, requesting that the thread stops itself at the next available opportunity. That’s all. There is no telling what the receiver thread might do with the signal: it may not even bother to check the signal; or even worse ignore it.

Once you start a thread, nothing can (safely) stop it, except the thread itself. At most, the thread could be simply asked – or interrupted – to stop itself.

Hence in Java, stopping threads is a two step procedure:

  • Sending stop signal to thread – aka interrupting it
  • Designing threads to act on interruption

A thread in Java could be interrupted by calling `Thread.interrupt()` method. Threads can check for interruption by calling `Thread.isInterrupted()` method. A good thread must check for interruption at regular intervals. The following code fragment illustrates this:

public static void main(String[] args) throws Exception {

        /**
         * A Thread which is responsive to Interruption.
         */
        class ResponsiveToInterruption extends Thread {
            @Override public void run() {
                while (!Thread.currentThread().isInterrupted()) {
                    System.out.println("[Interruption Responsive Thread] Alive");
                }
                System.out.println("[Interruption Responsive Thread] bye**");

            }
        }

        /**
         * Thread that is oblivious to Interruption. It does not even check it's
         * interruption status and doesn't even know it was interrupted.
         */
        class ObliviousToInterruption extends Thread {
            @Override public void run() {
                while (true) {
                    System.out.println("[Interruption Oblivious Thread] Alive");
                }
                // The statement below will never be reached.
                //System.out.println("[Interruption Oblivious Thread] bye");
            }
        }

        Thread theGood = new ResponsiveToInterruption();
        Thread theUgly = new ObliviousToInterruption();

        theGood.start();
        theUgly.start();

        theGood.interrupt(); // The thread will stop itself
        theUgly.interrupt(); // Will do nothing
}

 

[Interruption Oblivious Thread] Alive
[Interruption Responsive Thread] Alive
[Interruption Responsive Thread] Alive
[Interruption Oblivious Thread] Alive
[Interruption Responsive Thread] bye**
[Interruption Oblivious Thread] Alive
[Interruption Oblivious Thread] Alive
[Interruption Oblivious Thread] Alive
[Interruption Oblivious Thread] Alive
....

A well designed thread checks its interrupt status at regular intervals and take action when interrupted, usually by cleaning and stopping itself.

Blocking Methods and Interruption:

A thread can check for interruption at regular intervals – e.g. as a loop condition – and take action when it is interrupted. Life would have been easy if it weren’t for those pesky blocking methods: these methods may “block” and take a long time to return, effectively delaying the calling thread’s ability to check for interruption in a timely manner. Methods like `Thread.sleep()`, `BlockingQueue.put()`, `ServerSocket.accept()` are some examples of blocking methods.

If the code is waiting on a blocked method, it may not check the interrupt status until the blocking method returns.

Blocking methods which support interruption usually throw an Exception when they detect interruption, transferring the control back to the caller. Blocking methods either throw InterruptedException or ClosedByInterruptionException to signal interruption to the caller. Let us consider an example. the code below calls `Thread.sleep()`. When it detects interruption, `Thread.sleep()` throws InterruptedException and the caller exits the loop. All blocking methods which throw InterruptedException also clear the interrupted status. You must either act on interruption when you catch this exception or at the very least, set the interrupted status again to allow the code higher up the stack to act on interruption.

   @Override
   public void run() {
        while(true) {
            try {
                Thread.sleep(Long.MAX_VALUE);
            } catch (InterruptedException exit) {
                break; //Break out of the loop; ending thread
            }
        }
    }

This may sound preposterous, but the code that does nothing on InterruptedException is “swallowing” the interruption, denying other code to take action on interruption.

Interruption Oblivious Blocking Methods:

In the first code example in this post, we have two threads, ResponsiveToInterruption and ObliviousToInterruption. The former checked for interruption regularly – as loop condition – whereas the later didn’t even bother to check. Blocking methods in Java library fall in the same two categories. The Good ones throw Exceptions when they detect interruption whereas the Ugly one’s don’t do anything. Blocking methods in the java.net.Socket don’t respond to Interruption. For example, the thread below cannot be stopped by interruption when it is waiting for clients. When a client is connected, accept() returns Socket allowing the caller to check for interruption:

        /**
         * Thread that checks for interruption, but calls a blocking method
         * that doesn't detect Interruptions.
         */
        class InterruptibleShesNot extends Thread {

            @Override
            public void run() {
                while(!Thread.currentThread().isInterrupted()) {
                    try {
                        ServerSocket server = new ServerSocket(8080);
                        Socket client = server.accept(); // This method will not
                                                         // return or 'unblock'
                                                         // until a client connects
                    } catch (IOException ignored) { }
                }

            }

        }

So how do you deal with blocking methods that do not respond to Interruption? You will have to think outside the box and find ways to cancel the operation by other means. For example, Socket operations throw SocketException when the underlying socket is closed (by `Socket.close()`). The code below takes advantage of this fact and closes the underlying socket, forcing all blocking methods such as ServerSocket.accept() to throw SocketException.

package kitchensink;

import java.net.*;
import java.io.*;

/**
 * Demonstrates non-standard thread cancellation.
 *
 * @author umermansoor
 */
public class SocketCancellation {

    /**
     * ServerSocket.accept() doesn't detect or respond to interruption. The
     * class below overrides the interrupt() method to support non-standard
     * cancellation by canceling the underlying ServerSocket forcing the
     * accept() method to throw Exception, on which we act by breaking the while
     * loop.
     *
     * @author umermansoor
     */
    static class CancelleableSocketThread extends Thread {

        private final ServerSocket server;

        public CancelleableSocketThread(int port) throws IOException {
            server = new ServerSocket(port);
        }

        @Override
        public void interrupt() {
            try {
                server.close();
            } catch (IOException ignored) {
            } finally {
                super.interrupt();
            }
        }

        @Override
        public void run() {
            while (true) {
                try {
                    Socket client = server.accept();
                } catch (Exception se) {
                    break;
                }
            }
        }
    }

    /**
     * Main entry point.
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        CancelleableSocketThread cst = new CancelleableSocketThread(8080);
        cst.start();
        Thread.sleep(3000);
        cst.interrupt();
    }
}

Summary:

  • Threads cannot be stopped externally; they can only be delivered a signal to stop
  • It is up to the Thread to: i) check the interruption flag regularly, and ii) to act upon it
  • Sometimes checking interruption is not possible if the thread is blocked on a blocking method, such as `BlockingQueue.put()`. Luckily, most blocking methods detect interruption and throw InterruptedException or ClosedByInterruptionException
  • To support blocking methods that do not act on interruptions, non-standard cancellation mechanisms must be used, as illustrated in the last example

Extra:

The thread class also has a method called `interrupted()`. This is what is does: it clears the interrupted status and returns its previous value. Use this method only when you know what you are doing or when you want to clear the interrupt status.