Hi,
I am using Reactive Extensions to handle events fired from a windows service. Here is the DataParser class:
namespace MessageQueue
{
public class DataParser
{
public DataParser()
{
var fwLanded = Observable.FromEvent<ThreadQueue.ArrivedHandler, ServiceData>(handler =>
{
ThreadQueue.ArrivedHandler fHandler = (e) =>
{
handler(e);
};
return fHandler;
},
fHandler => ThreadQueue.ArrivedEvent += fHandler,
fHandler => ThreadQueue.ArrivedEvent -= fHandler
);
fwLaoaded.GroupBy(fwl => fwl.Instance).Subscribe(g =>
{
LoadedParser landedParser = new LoadedParser(g.Key);
g.Subscribe(grouped =>
loadedParser.Add(grouped.tracker, grouped.trackerInstance)
);
}
);
}
}
}
The ThreadQueue is all static so when the service starts I instantiate this class and nothing more.
The event is fired from the queue class here:
public class DataEventArgs : EventArgs
{
public ServiceData ServiceData;
}
public delegate void ArrivedHandler(ServiceData data);
public static event ArrivedHandler ArrivedEvent;
private static void OnCheck(ServiceData data)
{
if (ArrivedEvent != null)
ArrivedEvent(data);
}
private static void ProcessMessage(Message message, int threadIndex)
{
message.MarkStarted();
Monitor fm = new Monitor(message.trackerInstanceId);
ServiceData data = fm.Check();
if (data != null)
{
if (data.IsTracked)
{
if (data.IsLoaded)
OnCheck(data);
message.MarkComplete();
}
else
{
message.MarkComplete();
RequeueMessage(message, 10);
}
}
else
{
RequeueMessage(message, 10);
}
_waitHandles[threadIndex].Set();
}
(Forgive the var & method naming. I have cut words to hide the nature of the project.)
I have debugged the code. Each message is handles once (no accidentally picking up the same message twice); the ThreadQueue ProcessMessage is run once per message, and the ArrivedEvent in OnCheck is called once per message. And yet, here is the problem
fHandler fires twice >_<
I cannot see why. I am currently checking the database to avoid inserting duplicates but I will need to add duplicates when multiple trackers are in place so I can record what each tracker has found in case they disagree (trackers are third party).
Any help would be appreciated. I have several mock projects so I can try any suggestions and / or post results and amendments
Thanks