Click here to Skip to main content
15,886,664 members
Please Sign up or sign in to vote.
2.00/5 (1 vote)
I have a requirement where I have to create a queue for each calling object. Then there is a method called "processMessages" which need to be scheduled to poll messages from this queue and send do some processing. When a call from another object is received, the processMessages need to be scheduled to poll messages from queue related to the new object and do some processing. I implemented this in the below manner. But everytime a new object calls, the scheduling of processMessages method is overwritten to the object who called most recently. The earlier scheduled ones are not running. How can I schedule this processMessages to run with different parameters in different threads? Please advice.

Below is the method which schedules to run processMessages each time an object calls this. An object would call this only once. But multiple objects would call this.

Java
public synchronized void subscribe(String[] catogeries, MessageListener onMessage) {
    timer = new Timer();
    subscribedMessageListener = onMessage;
    this.messageListenerItem = new MessageListenerItem(onMessage, catogeries);
    this.messageListnerItemList.add(messageListenerItem);
    messageQueue = new ArrayBlockingQueue<>(10);
    this.messageListenerMessageQueueMap.put(onMessage, messageQueue);
    timer.scheduleAtFixedRate(new TimerTask() {
        public void run() {
            processMessages(subscribedMessageListener, messageListenerItem);
        }

    }, 0, 2000);

}

Below is the logic of the processMessages method.

Java
private void processMessages(MessageListener onMessage, MessageListenerItem messageListenerItem) {

    System.out.println("Thread is running for : " + onMessage.toString());

    BlockingQueue<Message> queueToBeProcessed = messageListenerMessageQueueMap.get(onMessage);
    Message messageToBeBroadcasted = queueToBeProcessed.poll();
    while (messageToBeBroadcasted != null) {
        messageListenerItem.sendMessage(messageToBeBroadcasted);
        messageToBeBroadcasted = queueToBeProcessed.poll();
    }
}
Posted
Updated 27-Dec-15 22:27pm
v2

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900