Thread Utilities

Sleeping and Interruptions

Often, you'll see code like the example below

try {
   Thread.sleep(100);
} catch (InterruptedException e) {
   // nothing
}

tempus-fugit captures the annoying boiler plate code needed to reset the interrupt flat in situations where you can't or don't want to rethrow the InterruptedException.

Using the ThreadUtils.sleep method, the above code is rewritten as.

sleep(millis(100));

This ensures that the interrupt flag is reset and is more explicit about the duration of the sleep.

If you want to ensure the interrupt flag is reset for other code, you can use the ThreadUtils.resetInterruptFlagWhen method directly. The Interruptible interface is used to highlight that the lamda-like call you want to execute does in fact throw the InterruptedException. For example;

resetInterruptFlagWhen(new Interruptible<Void>() {
    public Void call() throws InterruptedException {
        Thread.sleep(100);
        return null;
    }
}); 

Extracting the lamda-like Interruptible to a method makes the code more expressive;

resetInterruptFlagWhen(sleepingIsInterrupted()); 

This is actually how the ThreadUtils.sleep method is implemented within tempus-fugit.

Scheduled Interruption

The Interrupter class allows you to schedule an interrupt on a thread after a specified duration. This can be useful when implementing timeouts on classes that support the use of interrupt as an interruption policy. For example, the code below sets up an interrupt to be scheduled after some timeout, before embarking on some potentially long running process. The Interrupter and Thread classes have been statically imported.

Interrupter interrupter = interrupt(currentThread()).after(timeout);
try {
    while (!currentThread().isInterrupted()) {
        // some long running process
    }
} finally {
    interrupter.cancel();
} 

The Interrupter spawns a thread which sleeps (using WaitFor) until the timeout expires. It then just calls interrupt on the passed in thread. It is important therefore to ensure you cancel the interrupt as above for the case when the long running process could finish before the timeout. The cancel has no affect if the timeout has already expired so using a finally block is recommended.

The DefaultTimeoutableCompletionService classes uses this approach to implement a java.util.concurrent.CompletionService-like service that will timeout and return any completed tasks and abandoning any remaining.

Thread Dumps

The ThreadDump class offers a programmatic way to print a thread dump. It's not recommended for production code but can be handy in tracking down unexpected behaviour interactively. Using ThreadDump.dumpThreads(System.out), for example, would yield something like the following (formatting inspired by jstack).

 Thread Reference Handler@2: (state = WAITING)
 - java.lang.Object.wait(Native Method)
 - java.lang.Object.wait(Object.java:474)
 - java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116)

Thread main@1: (state = RUNNABLE)
 - java.lang.Thread.dumpThreads(Native Method)
 - java.lang.Thread.getAllStackTraces(Thread.java:1460)
 - com.google.code.tempusfugit.concurrency.ThreadDump.dumpThreads(ThreadDump.java:25)
  ...
 - com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:118)
 - com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:40)

Thread Signal Dispatcher@4: (state = RUNNABLE)

Thread Finalizer@3: (state = WAITING)
 - java.lang.Object.wait(Native Method)
 - java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:120)
 - java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:136)
 - java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159)
 

The dumpThreads method will also detect any Java-level deadlocks, see the Deadlock Detection section for details.

Deadlock Detection

The DeadlockDetector class allows you to programmatically detect basic deadlocks in your Java code. You can output deadlocks using the following code (note that printing a thread dump using the ThreadDump class will automatically attempt to find any deadlocks).

DeadlockDetector.printDeadlocks(System.out);

There are various types of deadlock in concurrent systems, broadly speaking with regard to Java, they can be categorised as

  • Java monitor cyclic locking dependency
  • Java Lock cyclic locking dependency
  • External resource based dependency
  • Live lock

The DeadlockDecector class can only spot Java monitor cyclic locking problems. It's implementation is basically the same as that used by jconsole and jstack and suffers the same limitations. Java 1.6 versions of jstack and jconsole can additionally detect Lock based cyclic problems. The types of deadlock it can detect can be illustrated in the example below.

 public void potentialDeadlock() {
     new Kidnapper().start();
     new Negotiator().start();
 }

 public class Kidnapper extends Thread {
     public void run() {
         synchronized (nibbles) {
             synchronized (cash) {
                 take(cash);
             }
         }
     }
 }

 public class Negotiator extends Thread {
     public void run() {
         synchronized (cash) {
             synchronized (nibbles) {
                 take(nibbles);
             }
         }
     }
 } 

Here, the Kidnapper is unwilling to release poor Nibbles the Cat until he has the Cash but our Negotiator is unwilling to part with the Cash until he has poor Nibbles back in his arms. The deadlock detector displays this woeful situation as follows.

Deadlock detected
=================

"Negotiator-Thread-1":
  waiting to lock Monitor of com.google.code.tempusfugit.concurrency.DeadlockDetectorTest$Cat@ce4a8a
  which is held by "Kidnapper-Thread-0"

"Kidnapper-Thread-0":
  waiting to lock Monitor of com.google.code.tempusfugit.concurrency.DeadlockDetectorTest$Cash@7fc8b2
  which is held by "Negotiator-Thread-1"
             

Miscellaneous

The Conditions class offers some thread related Conditions including the following.

  • Conditions.isWaiting(Thread thread)

    The static method Conditions.isWaiting(Thread) offers a Condition that allows you to test if a thread is in a waiting state. Combining the condition with some classes from the temporal package allows you to wait for a thread to be in the waiting state. For example,

    waitOrTimeout(isWaiting(thread), timeout(seconds(10))); 
  • Conditions.is(Thread thread, Thread.State state)

    The static method Conditions.is(Thread, ThreadState) offers a more general purpose Condition to check that a thread is in a given state. For example,

    waitOrTimeout(is(thread, TERMINATED), timeout(seconds(10))); 
  • Conditions.isAlive(Thread thread)

    The static method Conditions.isAlive(Thread will check that a thread is alive. A thread is alive if it has been started and has not yet died. For example,

    System.out.println(isAlive(Thread.currentThread())); 
  • Conditions.shutdown(ExecutorService service)

    This method will check that a java.util.concurrent.ExecutorService has been shutdown according to the result of the it's isShutdown method. This might be useful if you'd like to wait for shutdown. The tempus-fugit ExecutorServiceShutdown class does just this.

    waitOrTimeout(Conditions.shutdown(executor), timeout);
  • Conditions.not(Condition condition)

    The NotCondition will negate the logical result of some other condition. For example, we can change the example above to wait until a thread is not in a waiting state by using the following.

    waitOrTimeout(not(isWaiting(thread)), timeout(seconds(10))); 
  • Not exclusively for concurrent use, the ExceptionWrapper class allows you to run arbitrary code in a Callable block catching any Throwable as an exception of your choice, embedded the originating exception as the new exception's cause. For example,
    ExceptionWrapper.wrapAnyException(new Callable<Object>() {
        @Override
        public Object call() throws ServiceException {
            // nasty code throwing a bunch of exceptions
        }
    }, WithException.with(CalendarException.class));
                        

    You can also catch any Exception and rethrow as a RuntimeException with the originating exception as the cause. The example below shows the anonymous Callable being created in the method something.

     ExceptionWrapper.wrapAsRuntimeException(something()); 
  • As a convenience class, the DefaultThreadFactory offers an implementation of java.util.concurrent.ThreadFactory that creates a thread using the single argument constructor of Thread.

Concurrency Utilities

Countdown Latch with Timeout

Using an instance of a java.util.concurrent.CountDownLatch, we can wait for the latch to count down to zero, blocking the calling thread before continuing. When passing in a timeout, the method returns true if the count reached zero or false if the timeout expires.

To make the timeout more explicit, the CountDownLatchWithTimeout class will throw a TimeoutException rather than force you to check. Using a static import, the example looks like the following.

private final CountDownLatch startup = new CountDownLatch(1);

public void waitForStartup() throws InterruptedException, TimeoutException {
    await(startup).with(TIMEOUT);
} 

The use of the with method is required. Following the micro-DSL approach, it is the with that actually does the waiting. Calling the await method on it's own will not block.

Execute Using Locks

Using implementations of the java.util.concurrent.locks.Lock requires that the lock is acquired and importantly released after use. This leads to the common code below.

Lock lock = new ReentrantLock();
lock.lock;
try {
   // something useful
} finally {
    lock.unlock();
} 

The ExecuteUsingLock class provides a way to abstract the lock acquisition and release and ensure consistency across your code. It takes a Callable representing the statements to execute whilst the lock is acquired and the actual lock to use when executing and ensures the lock is always released.

public void forExample {
    execute(something()).using(lock);
}

private Callable<Void, RuntimeException> something() {
    return new Callable<Void, RuntimeException>() {
        public Void call() throws RuntimeException {
            return null;
        }
    };
}

Timeoutable Completion Service

The TimeoutableCompletionService interface describes a service similar to a java.util.concurrent.CompletionService except that it will timeout after a specified duration if all results havn't yet been retrieved.

The default implementation, DefaultTimeoutableCompletionService, delegates to an underlying CompletionService but will stop waiting for submitted tasks to complete after a given timeout.

If the timeout expires before all tasks have been completed, the DefaultTimeoutableCompletionService will throw a TimeoutException. The exception thrown will also contain the results from any tasks that did manage to complete before the timeout. If all the submitted tasks complete before the timeout expires, the results are returned from the submit method and no exceptions are thrown.

In the example below, we'd like to create a status monitoring application that will output the status of a set of probes to a web page. However, the web page must be loaded within a few seconds regardless of whether all the probes have returned their results. The completion service can be used to execute individual status probe tasks in parallel, timing out after some duration.

private DefaultTimeoutableCompletionService completionService = new DefaultTimeoutableCompletionService(new ExecutorCompletionService(...));               

public void probe() {
    try {
        List<Callable<Result>> probes = factory.create();
        List<Result> results = completionService.submit(probes);
        write(results);
    } catch (TimeoutExceptionWithFutures e) {
        write(getResultsFrom(e));
    } catch (ExecutionException e) {
        throw new RuntimeException(e);
    }
} 

Here, the list of tasks to run in parallel is created by some factory class and assigned to the probes variable. These are just a list of Callable objects which will be executed by the underlying Executor, in our case, they'll look up the status of some probe and return a Result object.

The DefaultTimeoutableCompletionService is then used to schedule execution of the tasks with the submit method. At this point, the code will block and wait for all results to be returned or for the TimeoutExceptionWithFutures to be thrown. In the case of all tasks completing within the timeout, the results are just outputted using the write method.

For the case where the completion service timed out, a TimeoutException is thrown which includes any completed results. The write method in this case can just extract the partial set of results from the exception.

The default timeout for the service is thirty seconds but this can be changed using an alternative constructor.

JUnit Integration

The majority of abstractions and classes from tempus-fugit can be used within your testing framework of choice but there are also a couple of direct junit integrations that may be of use to you.

Intermittent Tests

As much as possible, you aim to have a completely deterministic tests but despite your best efforts, the odd flickering test can still get through. Occasionally, you might want to run such a test repeatedly to get an idea of its indeterminacy. The Intermittent annotation can be combined with the IntermittentTestRunner to provide this behaviour along side junit.

You simply mark a junit test method (or class) as potentially intermittent using the Intermittent annotation as follows.

@Test
@Intermittent
public void flickering() {
   // ...
}

You can then use the @RunWith annotation to run the test using the IntermittentTestRunner. Any @Before or @After methods will be run once for each test repetition. The example below also shows that the repetition count can be overridden on the method annotation.

@RunWith(IntermittentTestRunner.class)
public class IntermittentTestRunnerTest {

    private static int testCounter = 0;
    private static int afterCounter = 0;
    private static int afterClassCounter = 0;

    @Test
    @Intermittent(repetition = 99)
    public void annotatedTest() {
        testCounter++;
    }

    @After
    public void assertAfterIsCalledRepeatedlyForAnnotatedTests() {
        assertThat(testCounter, is(equalTo(++afterCounter)));
    }

    @AfterClass
    public static void assertAfterClassIsCalledOnce() {
        assertThat(++afterClassCounter, is(equalTo(1)));
    }

    @AfterClass
    public static void assertannotatedTestRunsMultipleTimes() {
        assertThat(testCounter, is(equalTo(99)));
    }
}

If you annotate the class rather than individual test methods, every test method of the class will be treated as if it were marked as @Intermittent.

@RunWith(IntermittentTestRunner.class)
@Intermittent(repetition = 10)
public class IntermittentTestRunnerTest {

    private static int testCounter = 0;

    @Test
    public void annotatedTest() {
        testCounter++;
    }

    @Test
    public void anotherAnnotatedTest() {
        testCounter++;
    }

    @AfterClass
    public static void assertAnnotatedTestRunsMultipleTimes() {
        assertThat(testCounter, is(equalTo(20)));
    }

}

Parallel Tests

The tempus-fugit library allows you to run tests methods within classes in parallel. Each test method within a class will run on its own thread and in parallel with any other test methods in that class. The number of threads for a given test class will be equal to the number of test methods within that class.

You can use the ConcurrentTestRunner runner to run all test methods within a class in parallel. Simply mark your test to be @RunWith the ConcurrentTestRunner class as below.

@RunWith(ConcurrentTestRunner.class)
public class ConcurrentTestRunnerTest {

    @Test
    public void shouldRunInParallel1() {
        System.out.println("I'm running on thread " + Thread.currentThread().getName());
    }

    @Test
    public void shouldRunInParallel2() {
        System.out.println("I'm running on thread " + Thread.currentThread().getName());
    }

    @Test
    public void shouldRunInParallel3() {
        System.out.println("I'm running on thread " + Thread.currentThread().getName());
    }
}        

In this example, each of the individual test methods are run once but in their own thread, all spawned roughly at the same time. The output from the above might look like the following.

I'm running on thread ConcurrentTestRunner-Thread-0
I'm running on thread ConcurrentTestRunner-Thread-2
I'm running on thread ConcurrentTestRunner-Thread-1 

Another run might yield a different interpolation.

I'm running on thread ConcurrentTestRunner-Thread-1
I'm running on thread ConcurrentTestRunner-Thread-0
I'm running on thread ConcurrentTestRunner-Thread-2 

Load / Soak Tests

The tempus-fugit library offers a RepeatingRule and ConcurrentRule that can be used to run a test method multiple times and across multiple threads. This can be useful when writing load or soak tests. For example, you might want to sanity check that your synchronisation is working under load. Both rules work use an annotation and can be used independently or together.

To run multiple instances of a single test method in parallel, you annotate the test method with the associated Concurrent annotation and declare the rule in your test class. For example, you might test the AtomicInteger class in a similar way to this.

public class RunConcurrentlyTest {

    @Rule public ConcurrentRule rule = new ConcurrentRule();

    private static final AtomicInteger counter = new AtomicInteger();

    @Test
    @Concurrent (count = 5)
    public void runsMultipleTimes() {
        counter.getAndIncrement();
    }
 } 

Here, the test method is run in parallel across five threads which may or may not expose potential threading issues. It's not recommended you use this as your only concurrent testing strategy but it may occasionally find a use.

To run a test method multiple times, you can take advantage of the Repeating annotation and declare the rule in your test class. This is similar to using the Intermittent annotation but unlike using the @RunWith mechanism, using the Rule will not execute any @Before or @After methods between runs. Using Repeating is also more explicit that you intend to run some kind of load test rather than indicating that a test is intermittently failing.

public class RepeatingRuleTest {

    @Rule public RepeatingRule rule = new RepeatingRule();

    private static int counter = 0;

    @Test
    @Repeating(repetition = 99)
    public void annotatedTest() {
        counter++;
    }

    @After
    public void annotatedTestRunsMultipleTimes() {
        assertThat(counter, is(99));
    }
} 

Combining the Concurrent with the Repeating annotation allows you to run a test method repeatedly and across threads. For example, running the following

public class RunConcurrentlyTest {

    @Rule public ConcurrentRule concurrently = new ConcurrentRule();
    @Rule public RepeatingRule repeatedly = new RepeatingRule();

    private static final AtomicInteger counter = new AtomicInteger();

    @Test
    @Concurrent (count = 5)
    @Repeating (repetition = 10)
    public void runsMultipleTimes() {
        counter.getAndIncrement();
    }
 } 

Will repeat the test method ten times in five threads. Each thread will run the test method ten times, so in our example, the counter will be incremented to fifty.

Wait for Assertions

The WaitFor class can be used to wait for a condition to hold. You can use it within your tests to wait for some asynchronous process to complete and so make an assertion in a test.

In the example below, a test verifies that the action of clicking a button will toggle some switch to a different position. However, this test may or may not pass if the clickButton() kicks off an asynchronous process to update the toggle position. It may return immediatly having not yet changed the position.

@Test
public void toggleButton() {
    assertThat(toggle, is(ON));
    clickButton();
    assertThat(toggle, is(OFF));
} 

Rewriting the test using a WaitFor would look like this

@Test
public void toggleButton() throws TimeoutException {
    assertThat(toggle, is(ON));
    clickButton();
    waitOrTimeout(new Condition() {
        public boolean isSatisfied() {
            return toggle == OFF;
        }
    }, seconds(5));
} 

Here, the test will retry the Condition for five seconds before finally giving up and failing the test. If the condition is true immediately, the wait will continue immediately and the test will continue. By extracting a method to create the Condition, you can further refactor the test to be more expressive and start to create a library of reusable conditions.

@Test
public void toggleButton() throws TimeoutException {
    assertThat(toggle, is(ON));
    clickButton();
    waitOrTimeout(toggleIs(OFF), seconds(5));
}

private Condition toggleIs(final Position position) {
    return new Condition() {
        public boolean isSatisfied() {
            return toggle == position;
        }
    };
}

These mechanism are more fully described in the time-dependence section.

Interrupt Capturing Thread Stub

It can be tricky to test that an interrupt has been called on a thread because of the possibility of race conditions between calling interrupt and checking the status of the interrupt flag using Thread.isInterrupted() or Thread.interrupted. For example, the interrupt status can be reset when a thread goes into the TERMINATED state. You can use the WaitFor class to express an assertion must be true within a given time (as below) but in this case, the race conditions can still occur (due to the frequency of the check whilst waiting). In the example below, the created thread will perform some blocking function that can be interrupted (for example, sleeping) and we're testing that the call to interrupt will wake and change the interrupt status flag (asserting against thread.isInterrupted).

@Test (timeout = 500)
public void interrupted() throws TimeoutException, InterruptedException {
    final Thread thread = new Thread(new Runnable(...));
    thread.start();
    waitOrTimeout(threadIsWaiting(thread), millis(500));
    thread.interrupt();
    waitOrTimeout(new Condition() {
        public boolean isSatisfied() {
            return thread.isInterrupted();
        }
    }, millis(500));
} 

It may be simpler to use a stub to capture the interrupt. The InterruptCapturingThread class of tempus-fugit is just a stub extending Thread which records and gives access to stack traces of threads that call interrupt on it.

@Test (timeout = 500)
public void interrupted() throws TimeoutException, InterruptedException {
    InterruptCapturingThread thread = new InterruptCapturingThread(new Runnable(...));
    thread.start();
    waitOrTimeout(threadIsWaiting(thread), millis(500));
    thread.interrupt();
    waitOrTimeout(not(threadIsWaiting(thread)), millis(500));
    assertThat(thread.getInterrupters().isEmpty(), is(false));
} 

For testing purposes, you can also get a view on the stack traces of the threads that called interrupt on your thread. Calling thread.printStackTraceOfInterruptingThreads(System.out) from the example above would show something like the following.

java.lang.Thread.getStackTrace(Thread.java:1409)
   com.google.code.tempusfugit.concurrency.InterruptCapturingThread.interrupt(InterruptCapturingThread.java:61)
   com.google.code.tempusfugit.concurrency.InterruptCapturingThreadTest.interrupted(InterruptCapturingThreadTest.java:39)
   sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
   sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
   java.lang.reflect.Method.invoke(Method.java:585)
   org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
   org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
   org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
   org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
   org.junit.internal.runners.statements.FailOnTimeout$1.run(FailOnTimeout.java:28) 

Callables

Callable Adapter

The java.util.concurrent.Executors class has helper methods to convert from a Runnable to a Callable but lacks a counterpart to convert a Callable to a Runnable. This is presumably because a Runnable lacks the ability to throw exceptions or return values.

Both Runnable and Callable are handy interfaces to express lamda-like functionality in Java. Discounting their close relationship to concurrency mechanisms in Java, they both really just represent something that can be called. Choosing between the two, the Callable is more powerful.

The CallableAdapter class will convert from a Callable into a Runnable wrapping any exception as a RuntimeException and discarding any return type. This allows you to write general task code in the form of a Callable without necessarily being hampered if you need to pass in a Runnable to some framework code.

As a concrete example, Java's ScheduledExecutorService class doesn't allow you to schedule a task with a fixed delay or at a fixed rate. The interface takes a Runnable. However, using the adapter you can schedule a Callable at a fixed rate (ignoring the result). The underlying Callable task could then be used elsewhere where the result is actually used.

tempus-fugit Callable

The Callable interface extends java.util.concurrent.Callable but allows you to specify the exception as a generic type. This class has a fairly limited usage and isn't recommended over the standard Java version. Instead, if you create a method that takes a java.util.concurrent.Callable as a argument, calling it will force the method to throw Exception. If instead, you pass in a com.google.code.tempusfugit.concurrency.Callable, you can specify the specific exception to throw and therefore avoid being forced to throw Exception. For example

public <T> T foo(java.util.concurrent.Callable<T> callable) throws Exception {
    return callable.call();
} 

Whereas using the tempus-fugit version allows a more specific exception to be thrown.

public <T> T bar(com.google.code.tempusfugit.concurrency.Callable<T, RuntimeException> callable) {
    return callable.call();
}