Click here to Skip to main content
15,896,915 members
Articles / Desktop Programming / MFC

.NET Dynamic Software Load Balancing

Rate me:
Please Sign up or sign in to vote.
4.96/5 (111 votes)
9 Dec 200271 min read 511.8K   4.5K   242  
A Draft Implementation of an Idea for .NET Dynamic Software Load Balancing
//////////////////////////////////////////////////////////////////////////////
// .NET Dynamic Software Load Balancing
// A Draft Implementation of an Idea for .NET Dynamic Software Load Balancing
// (c) 2002, Stoyan Damov (stoyan_damov@hotmail.com)
// The software comes �AS IS�, with all faults and no warranties.
//////////////////////////////////////////////////////////////////////////////

#include "stdafx.h"
#include "LoadMonitoringServer.h"
#include "MachineLoadsCollection.h"
#include "CollectorWorker.h"
#include "ReporterWorker.h"

using namespace System::Net;
using namespace System::Net::Sockets;
using namespace System::Runtime::InteropServices;

namespace SoftwareLoadBalancing
{

// PUBLIC
LoadMonitoringServer::LoadMonitoringServer () :
	Tracer (S"LoadMonitoringServer")
{
	Trace (S"Initializing...");

	status = ServerStatus::Stopped;
	collectorThread = 0;
	eventCollectorThreadDone = 0;
	collectorThreadSucceeded = false;
	reporterThread = 0;
	eventReporterThreadDone = 0;
	reporterThreadSucceeded = false;
	registeredForRemoting = false;
	
	Trace (S"Configuring...");

	cfg = Configurator::Instance;
	
	String __gc* ip = cfg->GetText (CS_LMS, CK_LMS_IP_ADDRESS, 0);
	DEBUG_ASSERT (0 != ip);
	if (0 == ip)
		throw (new InvalidOperationException (
			S"IP address is not configured"));
	ipAddress = IpHelper::GetIpAddress (ip);
	DEBUG_ASSERT (0 != ipAddress);
	if (0 == ipAddress)
		throw (new InvalidOperationException (
			S"IP address is invalid"));
	
	collectorPort = cfg->GetInt (CS_LMS, CK_LMS_COLLECTOR_PORT, 0);
	DEBUG_ASSERT (0 != collectorPort);
	if (0 == collectorPort)
		throw (new InvalidOperationException (
			S"Collector port is not configured"));

	collectorBacklog = cfg->GetInt (CS_LMS, CK_LMS_COLLECTOR_BACKLOG, 0);
	DEBUG_ASSERT (0 != collectorBacklog);
	if (0 == collectorBacklog)
		throw (new InvalidOperationException (
			S"Collector backlog is not configured"));

	reporterPort = cfg->GetInt (CS_LMS, CK_LMS_REPORTER_PORT, 0);
	DEBUG_ASSERT (0 != reporterPort);
	if (0 == reporterPort)
		throw (new InvalidOperationException (
			S"Reporter port is not configured"));

	reporterBacklog = cfg->GetInt (CS_LMS, CK_LMS_REPORTER_BACKLOG, 0);
	DEBUG_ASSERT (0 != reporterBacklog);
	if (0 == reporterBacklog)
		throw (new InvalidOperationException (
			S"Reporter backlog is not configured"));

	int reportTimeout = cfg->GetInt (CS_LMS, CK_LMS_MACHINE_REPORT_TIMEOUT, 0);
	DEBUG_ASSERT (0 != reportTimeout);
	if (0 == reportTimeout)
		throw (new InvalidOperationException (
			S"Machine report timeout is not configured"));

	String __gc* channelProtocol = cfg->GetText (CS_LMS, CK_LMS_REMOTING_PROTOCOL, 0);
	DEBUG_ASSERT (0 != channelProtocol);
	if (0 == channelProtocol)
		throw (new InvalidOperationException (
			S"Remoting channel protocol is not configured"));
	channelProtocol = channelProtocol->ToLower ();
	bool channelIsTcp = 
		String::Compare (channelProtocol, CHANNEL_TCP, false) == 0;
	bool channelIsHttp = 
		String::Compare (channelProtocol, CHANNEL_HTTP, false) == 0;
	if (! (channelIsTcp || channelIsHttp))
		throw (new InvalidOperationException (
			S"Remoting channel protocol can only be 'tcp' or 'http'"));

	int channelPort = cfg->GetInt (CS_LMS, CK_LMS_REMOTING_CHANNEL_PORT, 0);
	DEBUG_ASSERT (0 != channelPort );
	if (0 == channelPort)
		throw (new InvalidOperationException (
			S"Remoting Channel Port is not configured"));
	
	if (channelIsTcp)
		remotingChannel = new TcpChannel (channelPort);
	else
		remotingChannel = new HttpChannel (channelPort);

	Trace (S"Configuration done.");

	machineLoads = new MachineLoadsCollection (reportTimeout);
	// the hack in action: here we set the internal static member 
	// "staticMachineLoads" to point to the instantiated machine loads 
	// collection (see below...)
	//
	StaticMachineLoads = machineLoads;
	
	Trace (S"Initialization done.");
}

// PRIVATE
LoadMonitoringServer::~LoadMonitoringServer ()
{
	Trace (S"Terminating...");
	
	DEBUG_ASSERT (	status == ServerStatus::Stopped || 
					status == ServerStatus::Stopping);
	if (! (status == ServerStatus::Stopped || status == ServerStatus::Stopping))
		Stop ();
		
	Trace (S"Termination done.");
}

// PUBLIC
void LoadMonitoringServer::Start ()
{
	Trace (S"Starting...");

	DEBUG_ASSERT (! (status == ServerStatus::Starting ||
					 (status == ServerStatus::Started)));
	if (status == ServerStatus::Starting || status == ServerStatus::Started)
		return;

	status = ServerStatus::Starting;
	
	// subscribe the server to receive notification when the configuration
	// changes (the underlying configuration file is modified)
	//
	cfg->add_Changed (
		new Configurator::ChangedEventHandler (
			this, &LoadMonitoringServer::OnConfigurationChanged));

	// remove the const'ness of "this" to get away from C2664
	LoadMonitoringServer __gc* thisServer = 
		const_cast<LoadMonitoringServer __gc*> (this);

	// set up the signalling events
	//
	eventCollectorThreadDone = new AutoResetEvent (false);
	eventReporterThreadDone = new AutoResetEvent (false);

	Trace (S"Starting collector thread...");
	// launch the collector worker thread
	//
	CollectorWorker __gc* collectorWorker = new CollectorWorker (
		thisServer, 
		new WorkerDoneEventHandler (
			this, 
			&LoadMonitoringServer::OnCollectorThreadDone),
		ipAddress,
		collectorPort,
		collectorBacklog,
		machineLoads);
	collectorThread = new Thread (
		new ThreadStart (collectorWorker, &CollectorWorker::Run));
	collectorThread->Name = S"Load Monitoring Server's Collector Thread";
	collectorThread->Start ();
	Trace (S"Collector thread started.");

	Trace (S"Starting reporter thread...");
	// launch the reporter worker thread
	//
	ReporterWorker __gc* reporterWorker = new ReporterWorker (
		thisServer, 
		new WorkerDoneEventHandler (
			this, 
			&LoadMonitoringServer::OnReporterThreadDone),
		reporterPort,
		reporterBacklog,
		machineLoads);
	reporterThread = new Thread (
		new ThreadStart (reporterWorker, &ReporterWorker::Run));
	reporterThread->Name = S"Load Monitoring Server's Reporter Thread";
	reporterThread->Start ();
	Trace (S"Reporter thread started.");

	// wait for the threads to signal us, by calling out the callbacks
	//
	eventCollectorThreadDone->WaitOne (Timeout::Infinite, false);
	eventReporterThreadDone->WaitOne (Timeout::Infinite, false);

	// check the outcome of the thread results
	//
	if (collectorThreadSucceeded && reporterThreadSucceeded)
	{
		status = ServerStatus::Started;
		
		// register the ServerLoadBalancer object with the Remoting
		// runtime
		//
		registeredForRemoting = RegisterLoadBalancerForRemoting ();
		DEBUG_ASSERT (registeredForRemoting);
		if (!registeredForRemoting)
		{
			Stop ();
			throw (new InvalidOperationException (
				S"Cannot register load balancer object "
				S"with the Remoting runtime!"));
		}
		
		Trace (S"Started.");
	}
	else
	{
		Stop ();
	}
}

// PUBLIC
void LoadMonitoringServer::Stop ()
{
	Trace (S"Stopping...");

	DEBUG_ASSERT (! (status == ServerStatus::Stopping ||
					 (status == ServerStatus::Stopped)));
	if (status == ServerStatus::Stopping || status == ServerStatus::Stopped)
		return;

	// unsubscribe from the "configuration changed" event
	//
	cfg->remove_Changed (
		new Configurator::ChangedEventHandler (
			this, &LoadMonitoringServer::OnConfigurationChanged));
			
	// unregister the ServerLoadBalancer object from the Remoting
	// runtime (note the "!" infront of the method)
	//
	registeredForRemoting = !UnregisterLoadBalancerFromRemoting ();
	DEBUG_ASSERT (!registeredForRemoting);

	status = ServerStatus::Stopping;
	if (reporterThread->IsAlive && !reporterThread->Join (ThreadJoinTimeout))
	{
		Trace (S"Aborting reporter thread...");
		reporterThread->Abort ();
		reporterThread->Join ();
		Trace (S"Reporter thread aborted.");
	}
	if (collectorThread->IsAlive && !collectorThread->Join (ThreadJoinTimeout))
	{
		Trace (S"Aborting collector thread...");
		collectorThread->Abort ();
		collectorThread->Join ();
		Trace (S"Collector thread aborted.");
	}
	status = ServerStatus::Stopped;
	
	Trace (S"Stopped.");
}

		
bool LoadMonitoringServer::RegisterLoadBalancerForRemoting ()
{
	DEBUG_ASSERT (!registeredForRemoting);
	if (registeredForRemoting)
		return (true);
		
	DEBUG_ASSERT (0 != remotingChannel);
	if (0 == remotingChannel)
		return (false);

	bool result = false;
	try
	{
		ChannelServices::RegisterChannel (remotingChannel);
		RemotingConfiguration::ApplicationName = REMOTED_OBJECT_NAME;
		RemotingConfiguration::RegisterWellKnownServiceType (
			__typeof (ServerLoadBalancer),
			REMOTED_OBJECT_NAME,
			WellKnownObjectMode::Singleton);
		result = true;
	}
	catch (Exception __gc* e)
	{
		TRACE_EXCEPTION_AND_RETHROW_IF_NEEDED (e);
		result = false;
	}
	return (result);
}


bool LoadMonitoringServer::UnregisterLoadBalancerFromRemoting ()
{
	DEBUG_ASSERT (registeredForRemoting);
	if (!registeredForRemoting)
		return (true);
		
	DEBUG_ASSERT (0 != remotingChannel);
	if (0 == remotingChannel)
		return (false);

	bool result = false;
	try
	{
		ChannelServices::UnregisterChannel (remotingChannel);
		result = true;
	}
	catch (Exception __gc* e)
	{
		TRACE_EXCEPTION_AND_RETHROW_IF_NEEDED (e);
		result = false;
	}
	
	return (result);
}


// PUBLIC __property
ServerStatus LoadMonitoringServer::get_Status ()
{
	return (status);
}

MachineLoadsCollection __gc* LoadMonitoringServer::get_MachineLoads ()
{
	return (machineLoads);
}

void LoadMonitoringServer::OnConfigurationChanged (
	Object __gc*	sender,
	EventArgs __gc*	e)
{
	(void) sender;	// unreferenced
	(void) e;		// ditto

	// restart to reflect the configuration changes
	//
	Stop ();
	Start ();
}

void LoadMonitoringServer::OnCollectorThreadDone (bool succeeded)
{
	collectorThreadSucceeded = succeeded;
	// signal the main thread's Start method, that the
	// collectorThread is up and running or the collectorThread 
	// has exited prematurely
	//
	eventCollectorThreadDone->Set ();
}

void LoadMonitoringServer::OnReporterThreadDone (bool succeeded)
{
	reporterThreadSucceeded = succeeded;
	// signal the main thread's Start method, that the
	// collectorThread is up and running or the collectorThread 
	// has exited prematurely
	//
	eventReporterThreadDone->Set ();
}

} // namespace

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here


Written By
Web Developer
Bulgaria Bulgaria
I'm crazy about programming, bleeding-edge technologies and my wife, Irina. Thinking seriously to start living in Centurian time.

The image shows me, happy :P

My blog

Comments and Discussions