|
//
// THIS CODE AND INFORMATION IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY
// KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR
// PURPOSE. IT CAN BE DISTRIBUTED FREE OF CHARGE AS LONG AS THIS HEADER
// REMAINS UNCHANGED.
//
// Email: gustavo_franco@hotmail.com
//
// Copyright (C) 2005 Franco, Gustavo
//
#include "AQConsumer.h"
using namespace AQLib;
namespace AQLib
{
AQConsumer::AQConsumer()
{
mExit = false;
mResetEvent = new AutoResetEvent(false);
mItemProcessed = true;
if (mTracingEnabled)
Trace::WriteLine(String::Concat("[AQLIB]Consumer : Consumer Created [", Thread::CurrentThread->GetHashCode().ToString(), "]"));
}
AQConsumer::~AQConsumer()
{
if (!this->Stop(true))
{
Thread::Sleep(100);
this->Stop(false);
}
}
bool AQConsumer::get_TracingEnabled()
{
return mTracingEnabled;
}
void AQConsumer::set_TracingEnabled(bool enabled)
{
mTracingEnabled = enabled;
}
Thread* AQConsumer::get_RunningThread()
{
return mRunningThread;
}
void AQConsumer::set_RunningThread(Thread* thread)
{
mRunningThread = thread;
}
bool AQConsumer::get_ItemProcessed()
{
return mItemProcessed;
}
Object* AQConsumer::get_Payload()
{
return mPayload;
}
void AQConsumer::set_Payload(Object* payload)
{
mPayload = payload;
mItemProcessed = false;
mResetEvent->Set();
}
void AQConsumer::Start()
{
try
{
if (mTracingEnabled)
Trace::WriteLine(String::Concat("[AQLIB]Consumer : Consumer Started [", Thread::CurrentThread->GetHashCode().ToString(), "]"));
mResetEvent->WaitOne();
while(!mExit)
{
if (mTracingEnabled)
Trace::WriteLine(String::Concat("[AQLIB]Consumer : Consumer Got Payload [", Thread::CurrentThread->GetHashCode().ToString(), "]"));
try
{
ProcessPayload(mPayload);
}
__finally
{
mItemProcessed = true;
}
mResetEvent->WaitOne();
}
if (mTracingEnabled)
Trace::WriteLine(String::Concat("[AQLIB]Consumer : Consumer Stopped [", Thread::CurrentThread->GetHashCode().ToString(), "]"));
}
catch(Exception *ex)
{
if (ex->GetType() == __typeof(System::Threading::ThreadAbortException))
return;
throw new AQException(ERROR_UNKNOW, SERROR_UNKNOW, ex);
}
}
bool AQConsumer::Stop(bool softStop)
{
try
{
if (this->RunningThread == NULL ||
this->RunningThread->ThreadState == System::Threading::ThreadState::Aborted ||
this->RunningThread->ThreadState == System::Threading::ThreadState::Stopped ||
this->RunningThread->ThreadState == System::Threading::ThreadState::StopRequested ||
this->RunningThread->ThreadState == System::Threading::ThreadState::AbortRequested ||
this->RunningThread->ThreadState == System::Threading::ThreadState::SuspendRequested)
return true;
}
catch(ThreadStateException*)
{
//TODO
//Some times it gives exception to receive information about a Stopped thread
//I have to analize the code to see where is the error
return true;
}
mExit = true;
if (softStop)
{
if (mTracingEnabled)
Trace::WriteLine(String::Concat("[AQLIB]Consumer : Consumer Stopped by a Soft Stop [", this->RunningThread->GetHashCode().ToString(), "]"));
mResetEvent->Set();
return false;
}
else
{
mResetEvent->Set();
if (!this->RunningThread->Join(10))
{
if (this->RunningThread->ThreadState == System::Threading::ThreadState::Suspended)
this->RunningThread->Resume();
this->RunningThread->Abort();
if (mTracingEnabled)
Trace::WriteLine(String::Concat("[AQLIB]Consumer : Thread had to be aborted for shutdown [", this->RunningThread->GetHashCode().ToString(), "]"));
//TODO: LOG IT IN THE LOG FILE
this->RunningThread = NULL;
return true;
}
if (mTracingEnabled)
Trace::WriteLine(String::Concat("[AQLIB]Consumer : Consumer Stopped by a Hard Stop [", this->RunningThread->GetHashCode().ToString(), "]"));
this->RunningThread = NULL;
return true;
}
}
}
|
By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.
If a file you wish to view isn't highlighted, and is a text file (not binary), please
let us know and we'll add colourisation support for it.
I started with programming about 19 years ago as a teenager, from my old Commodore moving to PC/Server environment Windows/UNIX SQLServer/Oracle doing gwBasic, QBasic, Turbo Pascal, Assembler, Turbo C, BC, Summer87, Clipper, Fox, SQL, C/C++, Pro*C, VB3/5/6, Java, and today loving C#.
Currently working as SDE on Failover Clustering team for Microsoft.
Passion for most programming languages and my kids Aidan&Nadia.