Click here to Skip to main content
15,893,588 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
See more:
Hi,

I am using Reactive Extensions to handle events fired from a windows service. Here is the DataParser class:

C#
namespace MessageQueue
{
    public class DataParser
    {
        public DataParser()
        {
            var fwLanded = Observable.FromEvent<ThreadQueue.ArrivedHandler, ServiceData>(handler =>
                {
                    ThreadQueue.ArrivedHandler fHandler = (e) =>
                        {//TODO: Fires twice!
                            handler(e);
                        };
                    return fHandler;
                },
                fHandler => ThreadQueue.ArrivedEvent += fHandler,
                fHandler => ThreadQueue.ArrivedEvent -= fHandler
                );

            fwLaoaded.GroupBy(fwl => fwl.Instance).Subscribe(g =>
                {
                    LoadedParser landedParser = new LoadedParser(g.Key);
                    g.Subscribe(grouped =>
                        loadedParser.Add(grouped.tracker, grouped.trackerInstance)
                        );
                }
                );
        }
    }
}


The ThreadQueue is all static so when the service starts I instantiate this class and nothing more.

The event is fired from the queue class here:
C#
public class DataEventArgs : EventArgs
{
    public ServiceData ServiceData;
}
public delegate void ArrivedHandler(ServiceData data);
public static event ArrivedHandler ArrivedEvent;

private static void OnCheck(ServiceData data)
{
    if (ArrivedEvent != null)
        ArrivedEvent(data);
}

private static void ProcessMessage(Message message, int threadIndex)
{
    message.MarkStarted();
    Monitor fm = new Monitor(message.trackerInstanceId);
    ServiceData data = fm.Check();
    if (data != null)
    {
        if (data.IsTracked)
        {
            if (data.IsLoaded)
                OnCheck(data);
            message.MarkComplete();
        }
        else
        {
            message.MarkComplete();
            RequeueMessage(message, 10);
            //TODO: found mismatch!
        }
    }
    else
    {
        RequeueMessage(message, 10);
    }
    _waitHandles[threadIndex].Set();
}


(Forgive the var & method naming. I have cut words to hide the nature of the project.)

I have debugged the code. Each message is handles once (no accidentally picking up the same message twice); the ThreadQueue ProcessMessage is run once per message, and the ArrivedEvent in OnCheck is called once per message. And yet, here is the problem

fHandler fires twice >_<
I cannot see why. I am currently checking the database to avoid inserting duplicates but I will need to add duplicates when multiple trackers are in place so I can record what each tracker has found in case they disagree (trackers are third party).

Any help would be appreciated. I have several mock projects so I can try any suggestions and / or post results and amendments

Thanks
Posted

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900