The com.google.code.tempusfugit.concurrency package contains classes and interfaces
designed to help when developing concurrent code.
Runnable using Callable AdapterOften, you'll see code like the example below
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// nothing
}
tempus-fugit captures the annoying boiler plate code needed to reset the interrupt flat in situations where you
can't or don't want to rethrow the InterruptedException.
Using the ThreadUtils.sleep method, the above code is rewritten as.
sleep(millis(100));
This ensures that the interrupt flag is reset and is more explicit about the duration of the sleep.
If you want to ensure the interrupt flag is reset for other code, you can use the ThreadUtils.resetInterruptFlagWhen
method directly. The Interruptible interface is used to highlight that the lamda-like call
you want to execute does in fact throw the InterruptedException. For example;
resetInterruptFlagWhen(new Interruptible<Void>() {
public Void call() throws InterruptedException {
Thread.sleep(100);
return null;
}
});
Extracting the lamda-like Interruptible to a method makes the code more expressive;
resetInterruptFlagWhen(sleepingIsInterrupted());
This is actually how the ThreadUtils.sleep method is implemented within tempus-fugit.
The Interrupter class allows you to schedule an interrupt on a thread after a
specified duration. This can be useful when implementing timeouts on classes that support the use of interrupt as an interruption policy. For example, the code below
sets up an interrupt to be scheduled after some timeout, before embarking on some potentially long running process. The Interrupter and Thread classes
have been statically imported.
Interrupter interrupter = interrupt(currentThread()).after(timeout);
try {
while (!currentThread().isInterrupted()) {
// some long running process
}
} finally {
interrupter.cancel();
}
The Interrupter spawns a thread which sleeps (using WaitFor) until the timeout expires.
It then just calls interrupt on the passed in thread. It is important therefore to ensure you cancel the interrupt as above for the case when the long running process
could finish before the timeout. The cancel has no affect if the timeout has already expired so using a finally block is recommended.
The DefaultTimeoutableCompletionService classes uses this
approach to implement a java.util.concurrent.CompletionService-like service that will timeout and return any completed tasks and abandoning any remaining.
The ThreadDump class offers a programmatic way to print a thread dump. It's not recommended for production
code but can be handy in tracking down unexpected behaviour interactively. Using ThreadDump.dumpThreads(System.out),
for example, would yield something like the following (formatting inspired by jstack).
Thread Reference Handler@2: (state = WAITING) - java.lang.Object.wait(Native Method) - java.lang.Object.wait(Object.java:474) - java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116) Thread main@1: (state = RUNNABLE) - java.lang.Thread.dumpThreads(Native Method) - java.lang.Thread.getAllStackTraces(Thread.java:1460) - com.google.code.tempusfugit.concurrency.ThreadDump.dumpThreads(ThreadDump.java:25) ... - com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:118) - com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:40) Thread Signal Dispatcher@4: (state = RUNNABLE) Thread Finalizer@3: (state = WAITING) - java.lang.Object.wait(Native Method) - java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:120) - java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:136) - java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159)
The dumpThreads method will also detect any Java-level deadlocks, see the
Deadlock Detection section for details.
The DeadlockDetector class allows you to programmatically detect basic deadlocks in your
Java code. You can output deadlocks using the following code (note that printing a thread dump using the ThreadDump class will automatically attempt to find
any deadlocks).
DeadlockDetector.printDeadlocks(System.out);
There are various types of deadlock in concurrent systems, broadly speaking with regard to Java, they can be categorised as
Lock cyclic locking dependency
The DeadlockDecector class can only spot Java monitor cyclic locking problems. It's implementation
is basically the same as that used by jconsole and jstack and suffers the same limitations.
Java 1.6 versions of jstack and jconsole can additionally detect Lock
based cyclic problems. The types of deadlock it can detect can be illustrated in the example below.
public void potentialDeadlock() {
new Kidnapper().start();
new Negotiator().start();
}
public class Kidnapper extends Thread {
public void run() {
synchronized (nibbles) {
synchronized (cash) {
take(cash);
}
}
}
}
public class Negotiator extends Thread {
public void run() {
synchronized (cash) {
synchronized (nibbles) {
take(nibbles);
}
}
}
}
Here, the Kidnapper is unwilling to release poor Nibbles the Cat until he
has the Cash but our Negotiator is unwilling to part with the Cash
until he has poor Nibbles back in his arms. The deadlock detector displays this woeful situation as follows.
Deadlock detected
=================
"Negotiator-Thread-1":
waiting to lock Monitor of com.google.code.tempusfugit.concurrency.DeadlockDetectorTest$Cat@ce4a8a
which is held by "Kidnapper-Thread-0"
"Kidnapper-Thread-0":
waiting to lock Monitor of com.google.code.tempusfugit.concurrency.DeadlockDetectorTest$Cash@7fc8b2
which is held by "Negotiator-Thread-1"
The Conditions class
offers some thread related Conditions including the following.
Conditions.isWaiting(Thread thread)
The static method Conditions.isWaiting(Thread) offers a Condition that allows you to test
if a thread is in a waiting state. Combining the condition with some classes from the
temporal package allows you to wait for a
thread to be in the waiting state. For example,
waitOrTimeout(isWaiting(thread), timeout(seconds(10)));
Conditions.is(Thread thread, Thread.State state)
The static method Conditions.is(Thread, ThreadState) offers a more general purpose
Condition to check that a thread is in a given state. For example,
waitOrTimeout(is(thread, TERMINATED), timeout(seconds(10)));
Conditions.isAlive(Thread thread)
The static method Conditions.isAlive(Thread will check that a thread is alive. A
thread is alive if it has been started and has not yet died. For example,
System.out.println(isAlive(Thread.currentThread()));
Conditions.shutdown(ExecutorService service)
This method will check that a java.util.concurrent.ExecutorService has been
shutdown according to the result of the it's isShutdown method. This might be
useful if you'd like to wait for shutdown. The tempus-fugit
ExecutorServiceShutdown
class does just this.
waitOrTimeout(Conditions.shutdown(executor), timeout);
Conditions.not(Condition condition)
The NotCondition will negate the logical result of some other condition. For
example, we can change the example above to wait until a thread is not in a waiting state
by using the following.
waitOrTimeout(not(isWaiting(thread)), timeout(seconds(10)));
Callable block catching any Throwable as
an exception of your choice, embedded the originating exception as the new exception's cause. For example,
ExceptionWrapper.wrapAnyException(new Callable<Object>() {
@Override
public Object call() throws ServiceException {
// nasty code throwing a bunch of exceptions
}
}, WithException.with(CalendarException.class));
You can also catch any Exception and rethrow as a RuntimeException with the
originating exception as the cause. The example below shows the anonymous Callable
being created in the method something.
ExceptionWrapper.wrapAsRuntimeException(something());
DefaultThreadFactory offers an implementation of java.util.concurrent.ThreadFactory
that creates a thread using the single argument constructor of Thread.
Using an instance of a java.util.concurrent.CountDownLatch, we can wait for the latch to count
down to zero, blocking the calling thread before continuing. When passing in a timeout, the method returns true
if the count reached zero or false if the timeout expires.
To make the timeout more explicit, the CountDownLatchWithTimeout
class will throw a TimeoutException rather than force you to check. Using a static import, the
example looks like the following.
private final CountDownLatch startup = new CountDownLatch(1);
public void waitForStartup() throws InterruptedException, TimeoutException {
await(startup).with(TIMEOUT);
}
The use of the with method is required. Following the micro-DSL
approach, it is the with that actually does the waiting. Calling the await method
on it's own will not block.
Using implementations of the java.util.concurrent.locks.Lock requires that the lock is acquired and importantly released
after use. This leads to the common code below.
Lock lock = new ReentrantLock();
lock.lock;
try {
// something useful
} finally {
lock.unlock();
}
The ExecuteUsingLock
class provides a way to abstract the lock acquisition and release and ensure consistency across your code. It
takes a Callable representing the statements to execute whilst the lock is acquired and the actual
lock to use when executing and ensures the lock is always released.
public void forExample {
execute(something()).using(lock);
}
private Callable<Void, RuntimeException> something() {
return new Callable<Void, RuntimeException>() {
public Void call() throws RuntimeException {
return null;
}
};
}
The TimeoutableCompletionService
interface describes a service similar to a java.util.concurrent.CompletionService except that it
will timeout after a specified duration if all results havn't yet been retrieved.
The default implementation, DefaultTimeoutableCompletionService,
delegates to an underlying CompletionService but will stop waiting for submitted tasks to complete
after a given timeout.
If the timeout expires before all tasks have been completed, the DefaultTimeoutableCompletionService
will throw a TimeoutException. The exception thrown will also contain the results from any tasks
that did manage to complete before the timeout. If all the submitted tasks complete before the timeout expires, the
results are returned from the submit method and no exceptions are thrown.
In the example below, we'd like to create a status monitoring application that will output the status of a set of probes to a web page. However, the web page must be loaded within a few seconds regardless of whether all the probes have returned their results. The completion service can be used to execute individual status probe tasks in parallel, timing out after some duration.
private DefaultTimeoutableCompletionService completionService = new DefaultTimeoutableCompletionService(new ExecutorCompletionService(...));
public void probe() {
try {
List<Callable<Result>> probes = factory.create();
List<Result> results = completionService.submit(probes);
write(results);
} catch (TimeoutExceptionWithFutures e) {
write(getResultsFrom(e));
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
Here, the list of tasks to run in parallel is created by some factory class and assigned to the probes
variable. These are just a list of Callable objects which will be executed by the underlying
Executor, in our case, they'll look up the status of some probe and return a Result
object.
The DefaultTimeoutableCompletionService is then used to schedule execution of the tasks with the
submit method. At this point, the code will block and wait for all results to be returned or for
the TimeoutExceptionWithFutures
to be thrown. In the case of all tasks completing within the timeout, the results are just outputted using the
write method.
For the case where the completion service timed out, a TimeoutException is thrown which includes
any completed results. The write method in this case can just extract the partial set of results
from the exception.
The default timeout for the service is thirty seconds but this can be changed using an alternative constructor.
The majority of abstractions and classes from tempus-fugit can be used within your testing framework of choice but there are also a couple of direct junit integrations that may be of use to you.
As much as possible, you aim to have a completely deterministic tests but despite your best efforts,
the odd flickering test can still get through. Occasionally, you might want to run such a test repeatedly to
get an idea of its indeterminacy. The Intermittent
annotation can be combined with the IntermittentTestRunner
to provide this behaviour along side junit.
You simply mark a junit test method (or class) as potentially intermittent using the Intermittent
annotation as follows.
@Test
@Intermittent
public void flickering() {
// ...
}
You can then use the @RunWith annotation to run the test using the IntermittentTestRunner.
Any @Before or @After methods will be run once for each test repetition. The
example below also shows that the repetition count can be overridden on the method annotation.
@RunWith(IntermittentTestRunner.class)
public class IntermittentTestRunnerTest {
private static int testCounter = 0;
private static int afterCounter = 0;
private static int afterClassCounter = 0;
@Test
@Intermittent(repetition = 99)
public void annotatedTest() {
testCounter++;
}
@After
public void assertAfterIsCalledRepeatedlyForAnnotatedTests() {
assertThat(testCounter, is(equalTo(++afterCounter)));
}
@AfterClass
public static void assertAfterClassIsCalledOnce() {
assertThat(++afterClassCounter, is(equalTo(1)));
}
@AfterClass
public static void assertannotatedTestRunsMultipleTimes() {
assertThat(testCounter, is(equalTo(99)));
}
}
If you annotate the class rather than individual test methods, every test method of the class will be
treated as if it were marked as @Intermittent.
@RunWith(IntermittentTestRunner.class)
@Intermittent(repetition = 10)
public class IntermittentTestRunnerTest {
private static int testCounter = 0;
@Test
public void annotatedTest() {
testCounter++;
}
@Test
public void anotherAnnotatedTest() {
testCounter++;
}
@AfterClass
public static void assertAnnotatedTestRunsMultipleTimes() {
assertThat(testCounter, is(equalTo(20)));
}
}
The tempus-fugit library allows you to run tests methods within classes in parallel. Each test method within a class will run on its own thread and in parallel with any other test methods in that class. The number of threads for a given test class will be equal to the number of test methods within that class.
You can use the ConcurrentTestRunner
runner to run all test methods within a class in parallel. Simply mark your test to be @RunWith
the ConcurrentTestRunner class as below.
@RunWith(ConcurrentTestRunner.class)
public class ConcurrentTestRunnerTest {
@Test
public void shouldRunInParallel1() {
System.out.println("I'm running on thread " + Thread.currentThread().getName());
}
@Test
public void shouldRunInParallel2() {
System.out.println("I'm running on thread " + Thread.currentThread().getName());
}
@Test
public void shouldRunInParallel3() {
System.out.println("I'm running on thread " + Thread.currentThread().getName());
}
}
In this example, each of the individual test methods are run once but in their own thread, all spawned roughly at the same time. The output from the above might look like the following.
I'm running on thread ConcurrentTestRunner-Thread-0 I'm running on thread ConcurrentTestRunner-Thread-2 I'm running on thread ConcurrentTestRunner-Thread-1
Another run might yield a different interpolation.
I'm running on thread ConcurrentTestRunner-Thread-1 I'm running on thread ConcurrentTestRunner-Thread-0 I'm running on thread ConcurrentTestRunner-Thread-2
The tempus-fugit library offers a RepeatingRule
and ConcurrentRule
that can be used to run a test method multiple times and across multiple threads. This can be useful when
writing load or soak tests. For example, you might want to sanity check that your synchronisation is working
under load. Both rules work use an annotation and can be used independently or together.
To run multiple instances of a single test method in parallel, you annotate the test method with the
associated Concurrent
annotation and declare the rule in your test class. For example, you might test the
AtomicInteger class in a similar way to this.
public class RunConcurrentlyTest {
@Rule public ConcurrentRule rule = new ConcurrentRule();
private static final AtomicInteger counter = new AtomicInteger();
@Test
@Concurrent (count = 5)
public void runsMultipleTimes() {
counter.getAndIncrement();
}
}
Here, the test method is run in parallel across five threads which may or may not expose potential threading issues. It's not recommended you use this as your only concurrent testing strategy but it may occasionally find a use.
To run a test method multiple times, you can take advantage of the Repeating
annotation and declare the rule in your test class. This is similar to using the Intermittent
annotation but unlike using the @RunWith mechanism, using the Rule will not
execute any @Before or @After methods between runs. Using Repeating is
also more explicit that you intend to run some kind of load test rather than indicating that a test is intermittently
failing.
public class RepeatingRuleTest {
@Rule public RepeatingRule rule = new RepeatingRule();
private static int counter = 0;
@Test
@Repeating(repetition = 99)
public void annotatedTest() {
counter++;
}
@After
public void annotatedTestRunsMultipleTimes() {
assertThat(counter, is(99));
}
}
Combining the Concurrent
with the Repeating
annotation allows you to run a test method repeatedly and across threads. For example, running the following
public class RunConcurrentlyTest {
@Rule public ConcurrentRule concurrently = new ConcurrentRule();
@Rule public RepeatingRule repeatedly = new RepeatingRule();
private static final AtomicInteger counter = new AtomicInteger();
@Test
@Concurrent (count = 5)
@Repeating (repetition = 10)
public void runsMultipleTimes() {
counter.getAndIncrement();
}
}
Will repeat the test method ten times in five threads. Each thread will run the test method ten times, so in our example, the counter will be incremented to fifty.
The WaitFor class can be
used to wait for a condition to hold. You can use it within your tests to wait for some asynchronous process
to complete and so make an assertion in a test.
In the example below, a test verifies that the action of clicking a button will toggle some switch to a
different position. However, this test may or may not pass if the clickButton() kicks off an
asynchronous process to update the toggle position. It may return immediatly having not yet changed the position.
@Test
public void toggleButton() {
assertThat(toggle, is(ON));
clickButton();
assertThat(toggle, is(OFF));
}
Rewriting the test using a WaitFor would look like this
@Test
public void toggleButton() throws TimeoutException {
assertThat(toggle, is(ON));
clickButton();
waitOrTimeout(new Condition() {
public boolean isSatisfied() {
return toggle == OFF;
}
}, seconds(5));
}
Here, the test will retry the Condition for five seconds before finally giving up and failing the
test. If the condition is true immediately, the wait will continue immediately and the test will continue.
By extracting a method to create the Condition, you can further refactor the test to be more
expressive and start to create a library of reusable conditions.
@Test
public void toggleButton() throws TimeoutException {
assertThat(toggle, is(ON));
clickButton();
waitOrTimeout(toggleIs(OFF), seconds(5));
}
private Condition toggleIs(final Position position) {
return new Condition() {
public boolean isSatisfied() {
return toggle == position;
}
};
}
These mechanism are more fully described in the time-dependence section.
It can be tricky to test that an interrupt has been called on a thread because of the possibility of race conditions between calling interrupt and checking
the status of the interrupt flag using Thread.isInterrupted() or Thread.interrupted. For example, the interrupt status can be reset when a thread
goes into the TERMINATED state. You can use the WaitFor class to express an assertion must be true within a given time (as below) but in this case,
the race conditions can still occur (due to the frequency of the check whilst waiting). In the example below, the created thread will perform some blocking function that can
be interrupted (for example, sleeping) and we're testing that the call to interrupt will wake and change the interrupt status flag (asserting against thread.isInterrupted).
@Test (timeout = 500)
public void interrupted() throws TimeoutException, InterruptedException {
final Thread thread = new Thread(new Runnable(...));
thread.start();
waitOrTimeout(threadIsWaiting(thread), millis(500));
thread.interrupt();
waitOrTimeout(new Condition() {
public boolean isSatisfied() {
return thread.isInterrupted();
}
}, millis(500));
}
It may be simpler to use a stub to capture the interrupt. The InterruptCapturingThread class of tempus-fugit is just a stub extending Thread
which records and gives access to stack traces of threads that call interrupt on it.
@Test (timeout = 500)
public void interrupted() throws TimeoutException, InterruptedException {
InterruptCapturingThread thread = new InterruptCapturingThread(new Runnable(...));
thread.start();
waitOrTimeout(threadIsWaiting(thread), millis(500));
thread.interrupt();
waitOrTimeout(not(threadIsWaiting(thread)), millis(500));
assertThat(thread.getInterrupters().isEmpty(), is(false));
}
For testing purposes, you can also get a view on the stack traces of the threads that called interrupt on your thread. Calling thread.printStackTraceOfInterruptingThreads(System.out)
from the example above would show something like the following.
java.lang.Thread.getStackTrace(Thread.java:1409) com.google.code.tempusfugit.concurrency.InterruptCapturingThread.interrupt(InterruptCapturingThread.java:61) com.google.code.tempusfugit.concurrency.InterruptCapturingThreadTest.interrupted(InterruptCapturingThreadTest.java:39) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) java.lang.reflect.Method.invoke(Method.java:585) org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44) org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15) org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41) org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20) org.junit.internal.runners.statements.FailOnTimeout$1.run(FailOnTimeout.java:28)
The java.util.concurrent.Executors class has helper methods to convert from a
Runnable to a Callable but lacks a counterpart to convert a Callable
to a Runnable. This is presumably because a Runnable lacks the ability to
throw exceptions or return values.
Both Runnable and Callable are handy interfaces to express lamda-like functionality
in Java. Discounting their close relationship to concurrency mechanisms in Java, they both really
just represent something that can be called. Choosing between the two, the Callable is
more powerful.
The CallableAdapter class will convert from a Callable into a Runnable
wrapping any exception as a RuntimeException and discarding any return type. This allows
you to write general task code in the form of a Callable without necessarily being hampered
if you need to pass in a Runnable to some framework code.
As a concrete example, Java's ScheduledExecutorService class doesn't allow you to schedule
a task with a fixed delay or at a fixed rate. The interface takes a Runnable. However,
using the adapter you can schedule a Callable at a fixed rate (ignoring the result). The
underlying Callable task could then be used elsewhere where the result is actually used.
The Callable interface extends java.util.concurrent.Callable but allows you to specify the
exception as a generic type. This class has a fairly limited usage and isn't recommended over the standard
Java version. Instead, if you create a method that takes a java.util.concurrent.Callable as a
argument, calling it will force the method to throw Exception. If instead, you pass in a
com.google.code.tempusfugit.concurrency.Callable, you can specify the specific exception to
throw and therefore avoid being forced to throw Exception. For example
public <T> T foo(java.util.concurrent.Callable<T> callable) throws Exception {
return callable.call();
}
Whereas using the tempus-fugit version allows a more specific exception to be thrown.
public <T> T bar(com.google.code.tempusfugit.concurrency.Callable<T, RuntimeException> callable) {
return callable.call();
}