Click here to Skip to main content
Click here to Skip to main content

Tagged as

Sprinkler - Advanced synchronization object

, , 26 May 2014 CPOL
Rate this:
Please Sign up or sign in to vote.
Demonstrates a thread synchronization and memory sharing technique.

Introduction

Sprinkler is an advanced synchronization object, similar to a semaphore, but with added functionality. Sprinkler enables you to externalize data from internal threads. In a high concurrency environment, there are many cases where one main thread needs to wait for one or more threads, and data needs to be shared among all the threads. For instance, you may want to externalize exceptions from the inner threads to the main thread.

Background

Back in the day, we designed an asynchronous execution queue (an architecture for executing concurrent tasks in high scale). Our work methodology is TDD, so we started to think about testing our framework from day one. We learned very quickly that we need some kind of barrier that will help our main thread to wait (with timeout) for our framework's inner threads. Even this was not enough, because there were cases where the test expected an exception, but because the exception was being thrown by the inner thread, it could not be externalized to the main thread by simply doing a Java throw. Moreover, sometimes the test needed additional data from the inner threads, and we wanted to find a good way to transfer it. In order to fulfill our requirements, we needed some kind of semaphore and a shared memory. And so, Sprinkler was created.

Using the code

Sprinkler is based on the Singleton design pattern. It contains two primary methods: await and release. Sprinkler supports multiple semaphores (contexts). With each call to await and release, you must provide a unique identifier (context).

Here is an example of Sprinkler's capabilities - data and exception externalization:

public class TestSprinkler {

    @Test
    public void testAwait_ReleaserDeliversData() {
        
        final int CONTEXT = 1;
        final String DATA = "bla bla";

        // release will occur sometime in the future
        ExecutorServiceFactory.getCachedThreadPoolExecutor().submit(new Callable<void>() {
            
            @Override
            public Void call() throws Exception {
                
                Sprinkler.getInstance().release(CONTEXT, DATA);
                
                return null;
            }
            
        });

        // waiting for the releaser thread
        String data = (String) Sprinkler.getInstance().await(CONTEXT, 10000);
        Assert.assertEquals(DATA, data);
    }

    @Test
    public void testAwait_InnerThreadExternalizeException() {
        
        final int CONTEXT = 1;
        final String EXCEPTION_MESSAGE = "test inner thread exception message";
        
        // release will occur sometime in the future - simulate exception in the releaser thread
        ExecutorServiceFactory.getCachedThreadPoolExecutor().submit(new Callable<void>() {
            
            @Override
            public Void call() throws Exception {
                
                Sprinkler.getInstance().release(CONTEXT, new RuntimeException(EXCEPTION_MESSAGE));
                
                return null;
            }
            
        });
        
        Throwable thrown = null;
        try {
            Sprinkler.getInstance().await(CONTEXT, 10000);
        } catch (Throwable t) {
            // if the releaser thread delivers exception it will be externelized to this thread
            thrown = t;
        }
        Assert.assertTrue(thrown instanceof SprinklerException);
        Assert.assertEquals(EXCEPTION_MESSAGE, thrown.getCause().getMessage());
    }

    @Test
    public void testAwait_Timeout() {
        
        final int CONTEXT = 1;
        
        ExecutorServiceFactory.getCachedThreadPoolExecutor().submit(new Callable<void>() {
            
            @Override
            public Void call() throws Exception {
                
                Thread.sleep(10000);
                
                return null;
            }
            
        });
        
        Throwable thrown = null;
        try {
            Sprinkler.getInstance().await(CONTEXT, 1);
        } catch (Throwable t) {
            thrown = t;
        }
        Assert.assertTrue(thrown.getCause() instanceof TimeoutException);
    }
}

Sprinkler source code:

public class Sprinkler {
    
    private static Sprinkler _instance = new Sprinkler();
    
    private final ConcurrentMap<Integer, SprinklerData> _data =
            new ConcurrentHashMap<Integer, SprinklerData>();
    
    private Sprinkler() {}
    
    public static Sprinkler getInstance() {
        
        return _instance;
    }
    
    public void reset() {
        
        _data.clear();
    }
    
    /**
     * Locks the calling thread until someone will release it, or timeout will occur. 
     * 
     * @return data sent by releaser
     */
    public Object await(int key, long timeout) {
        
        SprinklerData data = null;
        try {
            data = getData(key);
            doAwait(data.getLatch(), timeout);
            externalizeException(data);
        } finally {
            _data.remove(key);
        }
        
        return data != null ? data.getInternal() : null;
    }
    
    public void release(int key) {
        
        release(key, null, null);
    }
    
    public synchronized void release(int key, Object internalData) {
        
        release(key, internalData, null);
    }
    
    public synchronized void release(int key, Throwable ex) {
        
        release(key, null, ex);
    }
    
    /**
     * Releases the lock on the waiting thread(s) for the given key, notifies them about
     * the given exception.
     */
    public synchronized void release(int key, Object internalData, Throwable ex) {
        
        SprinklerData data = getData(key);
        data.setInternal(internalData);
        data.setAlreadyReleased(true);
        if (ex != null) {
            data.setException(ex);
        }
        notify(data.getLatch());
    }
    
    private synchronized SprinklerData getData(int key) {
        
        SprinklerData data = _data.get(key);
        if (data == null) {
            data = new SprinklerData();
            _data.put(key, data);
        }
        
        return data;
    }
    
    private void externalizeException(SprinklerData data) {
        
        if (!isAlreadyReleased(data)) {
            throw new SprinklerException(new TimeoutException());
        }
        Throwable thrown = data.getException();
        if (thrown != null) {
            throw new SprinklerException(thrown);
        }
    }
    
    private void doAwait(CountDownLatch latch, long timeout) {
        
        try {
            latch.await(timeout, TimeUnit.MILLISECONDS);
        } catch (InterruptedException ex) {
            throw new SprinklerException(ex);
        }
    }
    
    private synchronized boolean isAlreadyReleased(SprinklerData data) {
        
        return data.isAlreadyReleased();
    }
    
    private void notify(CountDownLatch lock) {
        
        lock.countDown();
    }
    
    private static class SprinklerData {
        
        private final CountDownLatch _latch;
        private boolean _isAlreadyReleased = false;
        private Throwable _thrown;
        private Object _internal;
        
        public SprinklerData() {
            
            _latch = new CountDownLatch(1);
        }
        
        public Object getInternal() {
            
            return _internal;
        }
        
        public void setInternal(Object data) {
            
            _internal = data;
        }
        
        public CountDownLatch getLatch() {
            
            return _latch;
        }
        
        public boolean isAlreadyReleased() {
            
            return _isAlreadyReleased;
        }
        
        public void setAlreadyReleased(boolean isAlreadyReleased) {
            
            _isAlreadyReleased = isAlreadyReleased;
        }
        
        public Throwable getException() {
            
            return _thrown;
        }
        
        public void setException(Throwable thrown) {
            
            _thrown = thrown;
        }
        
        @Override
        public String toString() {
            
            return String.format(
                    "SprinklerData [latch.count=%s, isAlreadyReleased=%s, internal=%s, thrown.message=%s]",
                    _latch == null ? "null latch" : _latch.getCount(),
                    _isAlreadyReleased,
                    _internal,
                    _thrown == null ? "null" : _thrown.getMessage());
        }
    }
}

Points of Interest

Another way of achieving this purpose is to use Thread Pool. A thread pool provides you with a way to centralize all currently running threads. Since it has control over all the threads running in the pool, you can use it to call wait on a thread.

Example:

public class TestThreadPoolJoiner {
    
    @Test
    public void testJoin() throws InterruptedException, ExecutionException {
        
        final int TASKS = 10;
        final AtomicInteger executedTasks = new AtomicInteger(0);
        ThreadPoolJoiner joiner = new ThreadPoolJoiner();
        
        for (int i = 0; i < TASKS; i++) {
            joiner.submit(new Callable<Integer>() {
                
                @Override
                public Integer call() {
                    
                    return executedTasks.incrementAndGet();
                }
            });
        }
        
        joiner.join();
        Assert.assertEquals(TASKS, executedTasks.get());
    }
}

public class ThreadPoolJoiner extends ThreadPoolExecutor {
    
    Collection<Future<?>> _tasks = new CopyOnWriteArrayList<Future<?>>();
    
    public ThreadPoolJoiner() {
        
        super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
    }
    
    public void join() throws InterruptedException, ExecutionException {
        
        for (Future<?> currTask : _tasks) {
            currTask.get();
        }
    }
    
    @Override
    public <T> Future<T> submit(Callable<T> task) {
        
        Future<T> ret = super.submit(task);
        _tasks.add(ret);
        
        return ret;
    }
} 

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Authors

No Biography provided

No Biography provided

Comments and Discussions

 
GeneralMy vote of 5 Pinmemberxfzheng27-May-14 16:06 
GeneralRe: My vote of 5 PinmemberLital Kornfeld5-Jun-14 7:58 
GeneralRe: My vote of 5 Pinmemberxfzheng5-Jun-14 19:51 
GeneralRe: My vote of 5 PinmemberLital Kornfeld6-Jun-14 3:55 
GeneralRe: My vote of 5 Pinmemberxfzheng6-Jun-14 5:37 
GeneralMy vote of 5 PinmemberArash M. Dehghani2-Aug-13 2:02 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Terms of Use | Mobile
Web03 | 2.8.141223.1 | Last Updated 26 May 2014
Article Copyright 2013 by Effi Bar-She'an, Lital Kornfeld
Everything else Copyright © CodeProject, 1999-2014
Layout: fixed | fluid