Sprinkler - Advanced synchronization object





5.00/5 (1 vote)
Demonstrates a thread synchronization and memory sharing technique.
Introduction
Sprinkler is an advanced synchronization object, similar to a semaphore, but with added functionality.
Sprinkler enables you to externalize data from internal threads. In a high concurrency environment, there are many cases where one main thread needs to wait for one or more threads, and data needs to be shared among all the threads.
For instance, you may want to externalize exceptions from the inner threads to the main thread.
Background
Back in the day, we designed an asynchronous execution queue (an architecture for executing concurrent tasks in high scale). Our work methodology is TDD, so we started to think about testing our framework from day one. We learned very quickly that we need some kind of barrier that will help our main thread to wait (with timeout) for our framework's inner threads. Even this was not enough, because there were cases where the test expected an exception, but because the exception was being thrown by the inner thread, it could not be externalized to the main thread by simply doing a Java throw. Moreover, sometimes the test needed additional data from the inner threads, and we wanted to find a good way to transfer it. In order to fulfill our requirements, we needed some kind of semaphore and a shared memory. And so, Sprinkler was created.
Using the code
Sprinkler is based on the Singleton design pattern. It contains two primary methods: await and release. Sprinkler supports multiple semaphores (contexts). With each call to await and release, you must provide a unique identifier (context).
Here is an example of Sprinkler's capabilities - data and exception externalization:
public class TestSprinkler {
@Test
public void testAwait_ReleaserDeliversData() {
final int CONTEXT = 1;
final String DATA = "bla bla";
// release will occur sometime in the future
ExecutorServiceFactory.getCachedThreadPoolExecutor().submit(new Callable<void>() {
@Override
public Void call() throws Exception {
Sprinkler.getInstance().release(CONTEXT, DATA);
return null;
}
});
// waiting for the releaser thread
String data = (String) Sprinkler.getInstance().await(CONTEXT, 10000);
Assert.assertEquals(DATA, data);
}
@Test
public void testAwait_InnerThreadExternalizeException() {
final int CONTEXT = 1;
final String EXCEPTION_MESSAGE = "test inner thread exception message";
// release will occur sometime in the future - simulate exception in the releaser thread
ExecutorServiceFactory.getCachedThreadPoolExecutor().submit(new Callable<void>() {
@Override
public Void call() throws Exception {
Sprinkler.getInstance().release(CONTEXT, new RuntimeException(EXCEPTION_MESSAGE));
return null;
}
});
Throwable thrown = null;
try {
Sprinkler.getInstance().await(CONTEXT, 10000);
} catch (Throwable t) {
// if the releaser thread delivers exception it will be externelized to this thread
thrown = t;
}
Assert.assertTrue(thrown instanceof SprinklerException);
Assert.assertEquals(EXCEPTION_MESSAGE, thrown.getCause().getMessage());
}
@Test
public void testAwait_Timeout() {
final int CONTEXT = 1;
ExecutorServiceFactory.getCachedThreadPoolExecutor().submit(new Callable<void>() {
@Override
public Void call() throws Exception {
Thread.sleep(10000);
return null;
}
});
Throwable thrown = null;
try {
Sprinkler.getInstance().await(CONTEXT, 1);
} catch (Throwable t) {
thrown = t;
}
Assert.assertTrue(thrown.getCause() instanceof TimeoutException);
}
}
Sprinkler source code:
public class Sprinkler {
private static Sprinkler _instance = new Sprinkler();
private final ConcurrentMap<Integer, SprinklerData> _data =
new ConcurrentHashMap<Integer, SprinklerData>();
private Sprinkler() {}
public static Sprinkler getInstance() {
return _instance;
}
public void reset() {
_data.clear();
}
/**
* Locks the calling thread until someone will release it, or timeout will occur.
*
* @return data sent by releaser
*/
public Object await(int key, long timeout) {
SprinklerData data = null;
try {
data = getData(key);
doAwait(data.getLatch(), timeout);
externalizeException(data);
} finally {
_data.remove(key);
}
return data != null ? data.getInternal() : null;
}
public void release(int key) {
release(key, null, null);
}
public synchronized void release(int key, Object internalData) {
release(key, internalData, null);
}
public synchronized void release(int key, Throwable ex) {
release(key, null, ex);
}
/**
* Releases the lock on the waiting thread(s) for the given key, notifies them about
* the given exception.
*/
public synchronized void release(int key, Object internalData, Throwable ex) {
SprinklerData data = getData(key);
data.setInternal(internalData);
data.setAlreadyReleased(true);
if (ex != null) {
data.setException(ex);
}
notify(data.getLatch());
}
private synchronized SprinklerData getData(int key) {
SprinklerData data = _data.get(key);
if (data == null) {
data = new SprinklerData();
_data.put(key, data);
}
return data;
}
private void externalizeException(SprinklerData data) {
if (!isAlreadyReleased(data)) {
throw new SprinklerException(new TimeoutException());
}
Throwable thrown = data.getException();
if (thrown != null) {
throw new SprinklerException(thrown);
}
}
private void doAwait(CountDownLatch latch, long timeout) {
try {
latch.await(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
throw new SprinklerException(ex);
}
}
private synchronized boolean isAlreadyReleased(SprinklerData data) {
return data.isAlreadyReleased();
}
private void notify(CountDownLatch lock) {
lock.countDown();
}
private static class SprinklerData {
private final CountDownLatch _latch;
private boolean _isAlreadyReleased = false;
private Throwable _thrown;
private Object _internal;
public SprinklerData() {
_latch = new CountDownLatch(1);
}
public Object getInternal() {
return _internal;
}
public void setInternal(Object data) {
_internal = data;
}
public CountDownLatch getLatch() {
return _latch;
}
public boolean isAlreadyReleased() {
return _isAlreadyReleased;
}
public void setAlreadyReleased(boolean isAlreadyReleased) {
_isAlreadyReleased = isAlreadyReleased;
}
public Throwable getException() {
return _thrown;
}
public void setException(Throwable thrown) {
_thrown = thrown;
}
@Override
public String toString() {
return String.format(
"SprinklerData [latch.count=%s, isAlreadyReleased=%s, internal=%s, thrown.message=%s]",
_latch == null ? "null latch" : _latch.getCount(),
_isAlreadyReleased,
_internal,
_thrown == null ? "null" : _thrown.getMessage());
}
}
}
Points of Interest
Another way of achieving this purpose is to use Thread Pool. A thread pool provides you with a way to centralize all currently running threads. Since it has control over all the threads running in the pool, you can use it to call wait on a thread.
Example:
public class TestThreadPoolJoiner {
@Test
public void testJoin() throws InterruptedException, ExecutionException {
final int TASKS = 10;
final AtomicInteger executedTasks = new AtomicInteger(0);
ThreadPoolJoiner joiner = new ThreadPoolJoiner();
for (int i = 0; i < TASKS; i++) {
joiner.submit(new Callable<Integer>() {
@Override
public Integer call() {
return executedTasks.incrementAndGet();
}
});
}
joiner.join();
Assert.assertEquals(TASKS, executedTasks.get());
}
}
public class ThreadPoolJoiner extends ThreadPoolExecutor {
Collection<Future<?>> _tasks = new CopyOnWriteArrayList<Future<?>>();
public ThreadPoolJoiner() {
super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
public void join() throws InterruptedException, ExecutionException {
for (Future<?> currTask : _tasks) {
currTask.get();
}
}
@Override
public <T> Future<T> submit(Callable<T> task) {
Future<T> ret = super.submit(task);
_tasks.add(ret);
return ret;
}
}