Click here to Skip to main content
10,500,000 members (67,287 online)
Click here to Skip to main content
Add your own
alternative version

Stats

8.1K views
16 bookmarked
Posted

Distributed Semantic Computing

, 24 Jul 2015 CPOL
Rate this:
Please Sign up or sign in to vote.
Distributed semantic computing in a multi-threaded, Type-First Development (TFD) system

Introduction

I have written previously about semantic computing in the Higher Order Programming Environment (HOPE) articles:

Higher Order Programming Environment
APOD Web Scraper, A HOPE Demonstration
Hunt the Wumpus Implemented in HOPE
A Semantic Database In Action
The Semantic Web and Natural Language Processing

In this article, I will demonstrate distributed semantic computing using a Type-First Development (TFD) approach, a term first coined by Tomas Petricek in his blog entry "Why type-first development matters."

In this article, I've re-written the HOPE engine to utilize "type declarative programming."  This is a style of programming that relies heavily on generics to declaratively describe what should be done, not how.  It is the other side of the TFD coin -- in addition to developing types first, we also implement processes that operate on generic types, particularly those that implement specific interfaces.  Similar to how events, delegates, callbacks, and so forth are used for an inversion of control with regards to program behavior, "type declarative programming" is an inversion of control for instantiating objects.  Unlike HOPE, where types are declared in XML and compiled at runtime, here we use types implemented in the code itself.  Because of .NET's rich reflection and assembly loading capabilities, the difference is irrelevant to the overall goals of HOPE, but the difference to the developer is significant, especially with regards to the safety that a typed language gives you at runtime and the ease of programming (Intellisense and compile-time checking) in a typed language during development.

Source Code

 

The code can be found on GitHub: https://github.com/cliftonm/SemanticProcessor

Fundamentals of Type-First Programming

The core tenet of type-based programming is that it is declarative.  Using generics, we describe "what we want instantiated" rather than "how/when to instantiate it."  As Mr. Petricek states regarding type-first programming: "...when designing a program, you start thinking about the (data) types that represent the data your code works with...The development is not driven by types. It starts with types..."  The difference is technically very simple:

How:

Foo foo = new Foo();

What:

Foo foo = Proggy.Make<Foo>();

While the "what" example above looks trivial, consider what this affords you in a simple logging example:

public static class Proggy
{
  public static T Make<T>()
    where T : new()
  {
    Console.WriteLine("Making " + typeof(T).Name);
    return new T();
  }
}

class Program
{
  static void Main(string[] args)
  {
    Proggy.Make<StringBuilder>();
  }
}

Fundamentals of Semantic Computing

Semantic computing also reduces down to two very simple concepts:

  • A type is "semantic" -- it not only describes, but qualifies the meaning of its structure.
  • Computations are associated with semantic types.

This is orthogonal to object-oriented programming.  In OOP, an object carries around with it a collection of methods which implement computations on other (usually native, not semantic) types!  For example, in a simple OOP class:

A Non-Semantic Example

public class Receipt
{
  public decimal Total(decimal amount, decimal taxes) { return amount * (1 + taxes); }
}

class Program
{
  static void Main(string[] args)
  {
    Console.WriteLine("Non-semantic: " + new Receipt().Total(1M, .07M));
  }
}

A Semantic Example

To convert this to something that is suitable for semantic computing, we need to introduce a couple concepts: classes that are semantic types, and classes that process semantic types. 

  A semantic processor is actually just a sophisticated publisher/subscriber system.

Semantic Type Classes

We implement a semantic type through the use of an interface and a concrete class:

public interface ISemanticType { }

public class Purchase : ISemanticType
{
  public decimal Total { get; set; }
  public decimal Taxes { get; set; }
}

  Technically, even Total and Taxes should / could be semantic types, providing semantic meaning to their language-native types.

A semantic type:

  1. Has no explicit constructor.
  2. Does not implement computational methods
  3. Implements an interface that declares this class to be a semantic type--the reason for this will be described later.
  4. The interface has no methods or properties, it is simply a way of describing "this thing is a semantic type."

Processing Semantic Types: Receptors

We need to implement "something" that processes the semantic type.  Borrowing from HOPE, the "something" is called a "receptor":

public interface IReceptor { }
public interface IReceptor<T> : IReceptor
  where T : ISemanticType
{
  void Process(T semanticType);
}

public class Computation : IReceptor<Purchase>
{
  public void Process(Purchase p)
  {
    Console.WriteLine("Semantic:" + p.Total * (1 + p.Taxes));
  }
}

We note several key things here (should I have used the note icon or the key icon?):

  1. We have a memberless IReceptor interface.
  2. We provide an interface IReceptor<T> to declare that a specific Process method with an ISemanticType parameter.  While not necessary, this is a useful declaration of what Process methods the concrete receptor needs to implement.
  3. We implement a concrete receptor that processes the Purchase type.

Introducing The Semantic Processor

We need something that calls the Process method on receptors receiving the semantic type when a semantic type is actually instantiated:

public class SemanticProcessor
{
  protected Dictionary<Type, List<Type>> typeReceptors;

  public SemanticProcessor()
  {
    typeReceptors = new Dictionary<Type, List<Type>>();
  }

  public void Register<T, R>()
    where T : ISemanticType
    where R : IReceptor
  {
    List<Type> receptors;
    Type ttype = typeof(T);
    Type rtype = typeof(R);

    if (!typeReceptors.TryGetValue(ttype, out receptors))
    {
      receptors = new List<Type>();
      typeReceptors[ttype] = receptors;
    }

    receptors.Add(rtype);
  }

  public void ProcessInstance<T>(Action<T> initializer)
    where T : ISemanticType, new()
  {
    Type ttype = typeof(T);
    T semType = new T();
    initializer(semType);

    foreach (Type rtype in typeReceptors[ttype])
    {
      dynamic receptor = Activator.CreateInstance(rtype);
      receptor.Process(semType);
    }
  }
}

Putting It All Together

Using the SemanticProcessor involves a two step process:

  1. Register the semantic type with one or more receptors.
  2. When we need some computation(s) performed on the semantic type, call the ProcessInstance method with an Action<T> initializer to initialize properties of the semantic type.

It looks like this:

static void Main(string[] args)
{
  // non-semantic computation:
  Console.WriteLine("Non-semantic: " + new Receipt().Total(1M, .07M));

  // semantic computing:
  SemanticProcessor sp = new SemanticProcessor();
  sp.Register<Purchase, Computation>();
  sp.Process<Purchase>((t) => { t.Total = 1M; t.Taxes = 0.07M; });
}

We observe:

  1. The receptor is instantiated on demand.  A huge advantage here as we don't ourselves need to manage collections of instances anymore -- throw out your dependency injectors!
  2. We take advantage of the dynamic keyword and let C# handle the reflection to invoke the correct Process method for the required semantic type instance.
  3. Our program isn't instantiating anything itself.

The fact that the receptor is being instantiated by the SemanticProcessor allows us to, in the more sophisticated implementation that you'll see later:

  1. Wrap the Process call in a try-catch block to provide a unified exception handling mechanism.
  2. Log all processing.
  3. When the call completes, automatically call Dispose on receptors that implement IDisposable.
  4. Execute the call asynchronously -- calls into receptors to process semantic types can be put on other threads.
  5. Receptors, being constructed by the semantic processor for the sole purpose of processing a semantic type, are effectively stateless (sometimes we need a stateful receptor--the real implementation of the semantic processor supports this.)
  6. Distribute the call to other receptors on the network for processing.

The last point, "distribute the call to other receptors on the network", unlocks a vast potential for distributed semantic computing!

Implementing a Real Semantic Processor

The code above is too simplistic for developing real semantic applications.  We need:

  • The ability to work with both stateless (instantiated by the semantic processor) and stateful (instantiated by the application) receptors.
  • Semantic types should be processed asynchronously on worker threads (the default behavior) as well as the caller's thread for synchronous requirements
  • In addition to the semantic type, its sub-types should be processed as well by any interested parties (receptors.)  This allows us to create new semantic types while still retaining the behaviors implemented with sub-types.
  • To manage the exchange of semantic types between receptors, we need containers that "contain" the communication between receptors.  In HOPE, these containers are called membranes, and I'll use that term here as well, borrowing from a field called Membrane Computing.

We also want the implementation to provide the capabilities described at the end of the previous section:

  1. Wrap the Process call in a try-catch block to provide a unified exception handling mechanism.
  2. Log all processing.
  3. When the call completes, automatically call Dispose on receptors that implement IDisposable.
  4. Execute the call asynchronously -- calls into receptors to process semantic types can be put on other threads.
  5. Receptors, being constructed by the semantic processor for the sole purpose of processing a semantic type, are effectively stateless (sometimes we need a stateful receptor--the real implementation of the semantic processor supports this.)
  6. Distribute the call to other receptors on the network for processing.

Integration tests are a good way of illustrating the functionality of the Semantic Processor -- I'll be using NUnit for running integration tests.  I will also dive into some of the more interesting code at certain point in the integration tests. 

Why do I call them integration tests?  Because they demonstrate configuring and testing a particular scenario rather than individual methods.  This more useful because:

  1. It illustrates an real life use case
  2. It exercises the system rather than discrete methods
  3. In real life, most useful tests are actually integration tests

Membrane Registration

Membranes are containers for systems of receptors, which I call computational islands.  Membranes have some advanced features as well, which we'll look at later.  But for now, the important thing to know is that in any one Semantic Processor "system", a particular membrane type can only exist once in that system. 

Behind the scenes, the implementation looks like this:

public IMembrane RegisterMembrane<M>()
  where M : IMembrane, new()
{
  IMembrane membrane;
  Type m = typeof(M);

  if (!membranes.TryGetValue(m, out membrane))
  {
    membrane = new M();
    membranes[m] = membrane;
    membraneReceptorTypes[membrane] = new List<Type>();
    membraneReceptorInstances[membrane] = new List<IReceptor>();
  }

  return membrane;
}

Note that the membrane instance is instantiated immediately and stored in collection associated with its type.  Additional collections are also initialized for managing stateless receptor types and stateful instances within membranes.

We can test that membranes are distinct by type:

/// <summary>
/// Registering a membrane creates an instance of that membrane.
/// </summary>
[Test]
public void RegisterMembraneType()
{
  SemanticProcessor sp = new SemanticProcessor();
  IMembrane membrane = sp.RegisterMembrane<TestMembrane>();
  Assert.That(sp.Membranes.Contains(membrane), "Expected membrane instance.");
}

/// <summary>
/// Registering the same membrane type returns the same instance.
/// </summary>
[Test]
public void RegisterSameMembraneType()
{
  SemanticProcessor sp = new SemanticProcessor();
  IMembrane membrane1 = sp.RegisterMembrane<TestMembrane>();
  IMembrane membrane2 = sp.RegisterMembrane<TestMembrane>();
  Assert.That(membrane1 == membrane2, "Expected the same membrane instance.");
}

Stateless Receptors

Receptors process semantic types when the semantic type is "published".  Stateless receptors are created and destroyed as needed by the semantic processor (given that they usually run on their own thread, this avoids any mutable, cross-thread issues that would otherwise arise in a stateful, persistent receptor -- in other words, the system protects you from yourself.)  To test some basic operations of a stateless receptor, we need:

  • a test membrane
  • a test semantic type
  • a test receptor

In our test fixture, we'll have the receptor implement IDisposable so we can also test that the receptor is disposed after the call.  We'll also implement an interface and a sub-classed receptor to test that passing objects that implement an interfaces.  Here's the basic pieces:

public class TestMembrane : Membrane { }
public class TestSemanticType : ISemanticType { }

public interface ITestSemanticType { };
public class InterfaceTestSemanticType : ISemanticType, ITestSemanticType { }

public class TestReceptor : IReceptor, IDisposable
{
  public bool AFlag { get; set; }

  public TestReceptor()
  {
    constructorCalled = true;
  }

  public void Process(ISemanticProcessor proc, IMembrane membrane, TestSemanticType t)
  {
    callSuccess = true;
  }

  public void Dispose()
  {
    disposeCalled = true;
  }
}

public class TestReceptor2 : IReceptor
{
  public void Process(ISemanticProcessor proc, IMembrane membrane, TestSemanticType t)
  {
    callSuccess2 = true;
  }
}

public class DerivedTestReceptor : TestReceptor
{
}

// IReceptor type is optional, but good practice to make sure you implement Process on the semantic type.
public class InterfaceTestReceptor : IReceptor<ITestSemanticType> 
{
  public void Process(ISemanticProcessor proc, IMembrane membrane, ITestSemanticType t)
  {
    callSuccess = true;
  }
}

  The integration test's comments should be adequately explain what the test is doing.  Inspect how the membranes, semantic types, and receptors are set up in each test.  Note that all of these tests are performed in "immediate execute" mode rather attaching the processing onto a thread.  This makes the integration testing a lot easier.

/// <summary>
/// Given a receptor in a membrane, a semantic type put into that membrane is received by that receptor.
/// </summary>
[Test]
public void ReceptorReceivesSemanticTypeOnItsMembrane()
{
  callSuccess = false;
  SemanticProcessor sp = new SemanticProcessor();
  sp.Register<TestMembrane, TestReceptor>();
  sp.ProcessInstance<TestMembrane, TestSemanticType>(true);
  Assert.That(callSuccess, "Expected TestReceptor.Process to be called.");
}

/// <summary>
/// Given a semantic type put into one membrane, the receptor in another membrane does not receive it.
/// </summary>
[Test]
public void ReceptorDoesNotReceiveSemanticTypeOnAnotherMembrane()
{
  callSuccess = false;
  SemanticProcessor sp = new SemanticProcessor();
  sp.Register<TestMembrane, TestReceptor>();
  sp.ProcessInstance<TestMembrane2, TestSemanticType>(true);
  Assert.That(!callSuccess, "Expected TestReceptor.Process to NOT be called.");
}

/// <summary>
/// Test that when we remove a semantic type from a membrane's receptor, the receptor no longer gets Process calls.
/// </summary>
[Test]
public void RemoveType()
{
  callSuccess = false;
  SemanticProcessor sp = new SemanticProcessor();
  sp.Register<TestMembrane, TestReceptor>();
  sp.RemoveTypeNotify<TestMembrane, TestReceptor, TestSemanticType>();
  sp.ProcessInstance<TestMembrane, TestSemanticType>(true);
  Assert.That(!callSuccess, "Expected TestReceptor.Process to NOT be called.");
}

/// <summary>
/// Verify that when processing a semantic type, the receptor, registered by type, is created and destroyed.
/// </summary>
[Test]
public void ReceptorTypeCreateDestroy()
{
  constructorCalled = false;
  disposeCalled = false;
  SemanticProcessor sp = new SemanticProcessor();
  sp.Register<TestMembrane, TestReceptor>();
  sp.ProcessInstance<TestMembrane, TestSemanticType>(true);
  Assert.That(constructorCalled, "Expected constructor to be called.");
  Assert.That(disposeCalled, "Expected Dispose to be called.");
}

/// <summary>
/// Test that a semantic instance initializer is called when the semantic type is constructed.
/// </summary>
[Test]
public void InitializerCalledForSemanticTypeConstruction()
{
  bool initializerCalled = false;
  SemanticProcessor sp = new SemanticProcessor();
  sp.Register<TestMembrane, TestReceptor>();
  sp.ProcessInstance<TestMembrane, TestSemanticType>((t) => initializerCalled = true, true);
  Assert.That(initializerCalled, "Expected semantic type initializer to be called.");
}

/// <summary>
/// Test that the base class' Process method gets called for a type that it handles,
/// even though we instantiated a sub-class.
/// </summary>
[Test]
public void BaseClassProcessCalled()
{
  callSuccess = false;
  SemanticProcessor sp = new SemanticProcessor();
  sp.Register<TestMembrane, DerivedTestReceptor>();
  sp.ProcessInstance<TestMembrane, TestSemanticType>(true);
  Assert.That(callSuccess, "Expected TestReceptor.Process to be called.");
}

/// <summary>
/// Test that a receptor that implements Process on an interface gets called.
/// </summary>
[Test]
public void ReceptorOfInterfaceTypeCalled()
{
  callSuccess = false;
  SemanticProcessor sp = new SemanticProcessor();
  sp.Register<TestMembrane, InterfaceTestReceptor>();
  sp.ProcessInstance<TestMembrane, InterfaceTestSemanticType>(true);
  Assert.That(callSuccess, "Expected TestReceptor.Process to be called.");
}
/// <summary>
/// Verify that more than one receptor (but of different types in the same membrane) receives the Process call for the same semantic type.
/// </summary>
[Test]
public void MultipleProcessCalls()
{
  callSuccess = false;
  callSuccess2 = false;
  SemanticProcessor sp = new SemanticProcessor();
  sp.Register<TestMembrane, TestReceptor>();
  sp.Register<TestMembrane, TestReceptor2>();
  sp.ProcessInstance<TestMembrane, TestSemanticType>(true);
  Assert.That(callSuccess, "Expected TestReceptor.Process to be called.");
  Assert.That(callSuccess2, "Expected TestReceptor2.Process to be called.");
}

/// <summary>
/// Verify that the receptor initializer is called when a stateless receptor is instantiated.
/// </summary>
[Test]
public void ReceptorInitialization()
{
  receptorInitializerCalled = false;
  SemanticProcessor sp = new SemanticProcessor();
  sp.Register<TestMembrane, TestReceptor>((ir) =>
  {
    // Unfortunately, a cast is required, because ir is type declared as IReceptor
    // and I don't think it's possible to fix that because of the late callback.
    TestReceptor r = (TestReceptor)ir;
    r.AFlag = true;
    receptorInitializerCalled = true;
  });
  sp.ProcessInstance<TestMembrane, TestSemanticType>(true);
  Assert.That(receptorInitializerCalled, "Expected TestReceptor initializer to be called to be called.");
}

Under the hood, there's a few interesting things going on.  First, is the ProcessInstance method itself (I'm showing only a subset of the method):

protected void ProcessInstance<T>(IMembrane membrane, IMembrane caller, T obj, bool processOnCallerThread)
  where T : ISemanticType
{
  Type tsource = obj.GetType();
  List<Type> receptors = GetReceptors(membrane, tsource);
  Log(membrane, obj);

  foreach (Type ttarget in receptors)
  {
    dynamic target = Activator.CreateInstance(ttarget);
    ReceptorInitializer receptorInitializer;

    if (receptorInitializers.TryGetValue(new MembraneReceptor() 
       { Membrane = membrane, ReceptorType = ttarget }, out receptorInitializer))
    {
      receptorInitializer.Initializer(target);
    }

  // Call immediately?
  if (processOnCallerThread)
  {
    Call(new DynamicCall() { SemanticInstance = obj, Receptor = target, Proc = () => target.Process(this, membrane, obj) });
  }
  else
  {
    // Pick a thread that has the least work to do.
    threadPool.MinBy(tp => tp.Count).Enqueue(new DynamicCall() { SemanticInstance = obj, Receptor = target, Proc = () => target.Process(this, membrane, obj) });
  }
}

The other half is the call itself, which is either performed immediately or queued onto a thread with the least number of worker to do.  The call is wrapped in a try-catch block and, if the receptor implements IDisposable, calls the Dispose method immediately upon completion of the processing:

protected void Call(ProcessCall rc)
{
  try
  {
    rc.MakeCall();
  }
  catch (Exception ex)
  {
    // Prevent recursion if the exception process itself throws an exception.
    if (!(rc.SemanticInstance is ST_Exception))
    {
      ProcessInstance(Logger, new ST_Exception(ex), true);
    }
  }
  finally
  {
    if ( (rc.Receptor is IDisposable) && (rc.AutoDispose) )
    {
      ((IDisposable)rc.Receptor).Dispose();
    }
  }
}

As you can see, exception handling uses the semantic processor -- the exception is wrapped into a semantic type and placed into the Logger membrane, which is one of two membranes (the other being Surface) that the semantic processor creates for you.

Logging Receptors

Since I concluded the previous section talking about exception logging, it seems reasonable to demonstrate the integration tests for basic logging and exception logging.  The scaffolding for the integration tests involves a test receptor to throw an exception and two other receptors, one for normal event logging and the other for exception logging.

public static bool stLogged;
public static bool exLogged;

public class TestMembrane : Membrane { }
public class TestSemanticType : ISemanticType { }
public class TypeThrowsException : ISemanticType { }

public class TestReceptor : IReceptor
{
  public void Process(ISemanticProcessor proc, IMembrane membrane, TestSemanticType t)
  {
  }

  public void Process(ISemanticProcessor proc, IMembrane membrane, TypeThrowsException t)
  {
    throw new ApplicationException("Receptor exception");
  }
}

public class LoggerReceptor : IReceptor
{
  public void Process(ISemanticProcessor proc, IMembrane membrane, ISemanticType t)
  {
    stLogged = t is TestSemanticType;
  }
}

public class ExceptionReceptor : IReceptor
{
  public void Process(ISemanticProcessor proc, IMembrane membrane, ST_Exception ex)
  {
    exLogged = true;
  }
}

The standard logger processes ISemanticType -- since all semantic types derive from this interface, the logger's Process method will receive a notification for every single semantic type that is instantiated into the semantic processor.

  We have two integration tests, one for normal event logging and one for exception logging:

/// <summary>
/// Verify the a process call is logged.
/// </summary>
[Test]
public void ProcessCallIsLogged()
{
  stLogged = false;
  SemanticProcessor sp = new SemanticProcessor();
  sp.Register<LoggerMembrane, LoggerReceptor>();
  sp.Register<LoggerMembrane, ExceptionReceptor>();
  sp.Register<TestMembrane, TestReceptor>();
  sp.ProcessInstance<TestMembrane, TestSemanticType>(true);
  Assert.That(stLogged, "Expected Process call to be logged.");
}

/// <summary>
/// Verify that an exception log is generated when a receptor process creates an exception.
/// </summary>
[Test]
public void ExceptionIsLogged()
{
  exLogged = false;
  SemanticProcessor sp = new SemanticProcessor();
  sp.Register<LoggerMembrane, LoggerReceptor>();
  sp.Register<LoggerMembrane, ExceptionReceptor>();
  sp.Register<TestMembrane, TestReceptor>();
  sp.ProcessInstance<TestMembrane, TypeThrowsException>(true);
  Assert.That(exLogged, "Expected Exception call to be logged.");
}

Stateful Receptors

Stateful receptors are processing units that are instantiated, not by the semantic processor, but by you.  There are a lot of good reasons to have a stateful receptor:

  • Logging - creating and destroying a log event receptor would start to have an impact on performance
  • Services that must persist to receive messages from external sources -- we'll see an example of that in the distributed semantic process test.
  • Complex initialization and/or stateful requirements
  • etc

The integration tests for stateful receptors is essentially identical to the ones for stateless receptors, so I'll just show one to illustrate the difference in syntax:

/// <summary>
/// Given a receptor in a membrane, a semantic type put into that membrane is received by that receptor.
/// </summary>
[Test]
public void ReceptorReceivesSemanticTypeOnItsMembrane()
{
  callSuccess = false;
  SemanticProcessor sp = new SemanticProcessor();
  sp.Register<TestMembrane>(new TestReceptor());
  sp.ProcessInstance<TestMembrane, TestSemanticType>(true);
  Assert.That(callSuccess, "Expected TestReceptor.Process to be called.");
}

Note how the Register function has a membrane type generic parameter but is passed a receptor instance.

Complex Type Processing

Semantic types can be composed of other semantic types.  As in HOPE, when a complex type is instantiated into a membrane space, not only receptors that process that base type but also receptors that process its compositional type should be invoked.  This allows us to create complex type systems while still processing lower level types of which the more complex types are composed.

To test this, we need some scaffolding:

public static bool simpleTypeProcessed;
public static bool complexTypeProcessed;

public class TestMembrane : Membrane { }
public class SimpleType : ISemanticType { }
public class ComplexType : ISemanticType
{
  public SimpleType ASimpleType { get; set; }

  public ComplexType()
  {
    ASimpleType = new SimpleType();
  }
}

public class ComplexReceptor : IReceptor<ComplexType>
{
  public void Process(ISemanticProcessor pool, IMembrane membrane, ComplexType obj)
  {
    complexTypeProcessed = true;
  }
}

public class SimpleReceptor : IReceptor<SimpleType>
{
  public void Process(ISemanticProcessor pool, IMembrane membrane, SimpleType obj)
  {
    simpleTypeProcessed = true;
  }
}

We have a single integration test to verify that the inner "simple" type is processed as well when the complex type is instantiated into the membrane:

[Test]
public void ComplexTypePropertyProcessing()
{
  simpleTypeProcessed = false;
  complexTypeProcessed = false;
  SemanticProcessor sp = new SemanticProcessor();
  sp.Register<TestMembrane, ComplexReceptor>();
  sp.Register<TestMembrane, SimpleReceptor>();
  sp.ProcessInstance<TestMembrane, ComplexType>(true);
  Assert.That(complexTypeProcessed, "Expected ComplexReceptor.Process to be called.");
  Assert.That(simpleTypeProcessed, "Expected SimpleReceptor.Process to be called.");
}

Behind the scenes, reflection is used to discover public properties whose type implements ISemanticType:

/// <summary>
/// Any public properties that are of ISemanticType type and not null are also emitted into the membrane.
/// </summary>
protected void ProcessInnerTypes(IMembrane membrane, IMembrane caller, ISemanticType obj, bool processOnCallerThread)
{
  var properties = obj.GetType().GetProperties(BindingFlags.Instance | BindingFlags.Public).
     Where(pi => pi.PropertyType.GetInterfaces().Contains(typeof(ISemanticType)));

  properties.ForEach(pi =>
  {
    ISemanticType prop = (ISemanticType)pi.GetValue(obj);

    if (prop != null)
    {
      ProcessInstance(membrane, caller, prop, processOnCallerThread);
    }
  });
}

Membrane Permeability

While earlier I stated that membranes are containers for receptors, they are also hierarchical filters for semantic types.  A semantic type can permeate into a membrane, or it can permeate out, into another membrane.  We'll use that behavior later when discussing semantic distributed computing, but first, we have some tests that ensure that membrane permeability works the way we want it to.  As usual, we have some scaffolding:

public static bool callSuccess;

class TestMembrane : Membrane { }
class OuterMembrane : Membrane { }
class InnerMembrane : Membrane { }
class InnerMembrane2 : Membrane { }
public class TestSemanticType : ISemanticType { }

public class TestReceptor : IReceptor
{
  public void Process(ISemanticProcessor proc, IMembrane membrane, TestSemanticType t)
  {
    callSuccess = true;
  }
}

I've added some pictures here to illustrate what's going on.

Even though hierarchical, you can think of membranes as more being three dimensional, such that even though the inner membrane is outbound permeable to a type, the outer membrane (the membrane it sits on) must be inwardly permeable to that type as well!

Permeate Out

/// <summary>
/// Verify that, when the inner membrane is permeable outbound to a type,
/// that a receptor in the outer membrane, permeable inbound to that type, receive the type.
/// </summary>
[Test]
public void TypePermeatesOut()
{
  callSuccess = false;
  SemanticProcessor sp = new SemanticProcessor();
  sp.OutboundPermeableTo<InnerMembrane, TestSemanticType>();
  sp.InboundPermeableTo<OuterMembrane, TestSemanticType>();
  sp.AddChild<OuterMembrane, InnerMembrane>();
  sp.Register<OuterMembrane, TestReceptor>();
  sp.ProcessInstance<InnerMembrane, TestSemanticType>(true);
  Assert.That(callSuccess, "Expected receptor in outer membrane to process the ST placed in the inner membrane.");
}

Permeate In

/// <summary>
/// Verify that, when the inner membrane is permeable inbound to a type,
/// that a receptor in the inner membrane receives the type.
/// </summary>
[Test]
public void TypePermeatesIn()
{
  callSuccess = false;
  SemanticProcessor sp = new SemanticProcessor();
  sp.OutboundPermeableTo<OuterMembrane, TestSemanticType>();
  sp.InboundPermeableTo<InnerMembrane, TestSemanticType>();
  sp.AddChild<OuterMembrane, InnerMembrane>();
  sp.Register<InnerMembrane, TestReceptor>();
  sp.ProcessInstance<OuterMembrane, TestSemanticType>(true);
  Assert.That(callSuccess, "Expected receptor in inner membrane to process the ST placed in the outer membrane.");
}

Permeate Across

Given that permeating out and permeating in works, permeating across, via the other membrane, should also work:

/// <summary>
/// Verify that a type issued in one inner membrane can cross over to
/// an adjacent inner membrane via outbound permeability on the source
/// and inbound permeability on the target membrane.
/// </summary>
[Test]
public void TypePermeatesAcross()
{
  callSuccess = false;
  SemanticProcessor sp = new SemanticProcessor();
  sp.OutboundPermeableTo<InnerMembrane, TestSemanticType>();
  sp.InboundPermeableTo<OuterMembrane, TestSemanticType>();
  sp.OutboundPermeableTo<OuterMembrane, TestSemanticType>();
  sp.InboundPermeableTo<InnerMembrane2, TestSemanticType>();
  sp.AddChild<OuterMembrane, InnerMembrane>();
  sp.AddChild<OuterMembrane, InnerMembrane2>();
  sp.Register<InnerMembrane2, TestReceptor>();
  sp.ProcessInstance<InnerMembrane, TestSemanticType>(true);
  Assert.That(callSuccess, "Expected receptor in inner membrane to process the ST placed in the adjacent inner membrane.");
}

Not Permeable Tests

And lastly, we want to ensure that if outbound or inbound permeability is not established, the semantic type does not permeate out of its membrane to either an outer or inner membrane.  The code that prevents permeation has been commented out.

/// <summary>
/// Outer membrane does not receive semantic type if inner membrane is not outbound permeable to it.
/// </summary>
[Test]
public void NotPermeableOut()
{
  callSuccess = false;
  SemanticProcessor sp = new SemanticProcessor();
  // sp.OutboundPermeableTo<InnerMembrane, TestSemanticType>();
  sp.InboundPermeableTo<OuterMembrane, TestSemanticType>();
  sp.AddChild<OuterMembrane, InnerMembrane>();
  sp.Register<OuterMembrane, TestReceptor>();
  sp.ProcessInstance<InnerMembrane, TestSemanticType>(true);
  Assert.That(!callSuccess, "Expected receptor in outer membrane to NOT receive the ST placed in the inner membrane.");
}

/// <summary>
/// Outer membrane does not receive semantic type if it is not inbound permeable to it.
/// </summary>
[Test]
public void NotPermeableIn()
{
  callSuccess = false;
  SemanticProcessor sp = new SemanticProcessor();
  sp.OutboundPermeableTo<InnerMembrane, TestSemanticType>();
  // sp.InboundPermeableTo<OuterMembrane, TestSemanticType>();
  sp.AddChild<OuterMembrane, InnerMembrane>();
  sp.Register<OuterMembrane, TestReceptor>();
  sp.ProcessInstance<InnerMembrane, TestSemanticType>(true);
  Assert.That(!callSuccess, "Expected receptor in outer membrane to NOT receive the ST placed in the inner membrane.");
}

Behind the scenes, this whole issue of permeability is handled in by small method:

/// <summary>
/// Traverse permeable membranes without calling back into the caller. While membranes should not be bidirectionally
/// permeable, this does stop infinite recursion if the user accidentally (or intentionally) configured the membranes thusly.
/// </summary>
protected void PermeateOut(IMembrane membrane, IMembrane caller, ISemanticType obj, bool processOnCallerThread)
{
  List<IMembrane> pmembranes = ((Membrane)membrane).PermeateTo(obj);
  pmembranes.Where(m => m != caller).ForEach((m) => ProcessInstance(m, membrane, obj, processOnCallerThread));
}

In order to stop bounce-back (A is permeable to B and B is permeable to A) we track who the caller is so that when we traverse up or down the membrane hierarchy, we do not traverse back to ourselves!

Of course, the real workhorse is the PermeateTo method:

/// <summary>
/// Given this membrane's outbound list, what membranes are inbound permeabe to the ST as well?
/// </summary>
public List<IMembrane> PermeateTo(ISemanticType st)
{
  List<IMembrane> ret = new List<IMembrane>();
  Type sttype = st.GetType();

  if (outboundPermeableTo.Contains(sttype))
  {
    // Can we traverse to the parent?
    if ((parent != null) && (parent.inboundPermeableTo.Contains(sttype)))
    {
      ret.Add(parent);
    }

    // Can we traverse to children?
    foreach (Membrane child in childMembranes)
    {
      if (child.inboundPermeableTo.Contains(sttype))
      {
        ret.Add(child);
      }
    }
  }

  return ret;
}

Distributed Semantic Computing

DSC Membrane: Distributed Semantic Computing Membrane
DCR: Distributed Computing Receptor
ST: Semantic Type

The pièce de résistance is the ability to create a stateful receptor that implements a web server (in this case, the Basic Web Server I wrote about previously.)  Using Newtonsoft's Json.NET serialization, we can easily serialize and deserialize a semantic type into/from JSON.  While not the most efficient serialization format, I've chosen this format because it gives you an idea of where I'm moving to next -- semantic computing on the web.  But for now, back to the integration test that demonstrates how we can distribute the computation of semantic types.

Again, we need scaffolding for the membranes and receptors:

public static string received;

public class TestMembrane : Membrane { }

public class TestReceptor : IReceptor
{
  public void Process(ISemanticProcessor proc, IMembrane membrane, TestDistributedSemanticType t)
  {
    received = t.Message;
  }
}

public class DistributedProcessMembrane : Membrane { }

// For unit test support. Normally, each distributed system would either declare its own types
// or share types through a common assembly.
public class TestDistributedSemanticType : ISemanticType
{
  public string Message { get; set; }
}

Not much here.  However, the setup for the test is a bit involved:

/// <summary>
/// Verify that a semantic type is received on a "remote" semantic processor.
/// </summary>
[Test]
public void DistributedComputation()
{
  SemanticProcessor spOut = new SemanticProcessor();
  SemanticProcessor spIn = new SemanticProcessor();

  received = "";
  OutboundDistributedComputingReceptor dcrOut = new OutboundDistributedComputingReceptor(4002);
  InboundDistributedComputingReceptor dcrIn = new InboundDistributedComputingReceptor(4002, spIn);

  // Create an "emitter" in which a semantic type emitted on the TestMembrane permeates
  // into the inner DistributedProcessMembrane for our test type.
  spOut.AddChild<TestMembrane, DistributedProcessMembrane>();
  spOut.OutboundPermeableTo<TestMembrane, TestDistributedSemanticType>();
  spOut.InboundPermeableTo<DistributedProcessMembrane, TestDistributedSemanticType>();

  // The stateful DCR out lives in the distributed process membrane.
  spOut.Register<DistributedProcessMembrane>(dcrOut);

  // Create a "receiver" in which a semantic type is received on the inner DistributedProcessMembrane
  // and the test type permeates out to a "handler" receptor.
  spIn.AddChild<TestMembrane, DistributedProcessMembrane>();
  spIn.OutboundPermeableTo<DistributedProcessMembrane, TestDistributedSemanticType>();
  spIn.InboundPermeableTo<TestMembrane, TestDistributedSemanticType>();
  
  // The stateful DCR in lives in the distributed process membrane.
  spIn.Register<DistributedProcessMembrane>(dcrIn);
  // The responding receptor lives in the TestMembrane
  spIn.Register<TestMembrane, TestReceptor>();

  // Put a semantic type instance on the outbound side.
  spOut.ProcessInstance<TestMembrane, TestDistributedSemanticType>((t) =>
  {
    t.Message = "Hello World";
  });

  // Wait a bit for threads to do their thing and Http posts to do their things.
  // !*!*!*!* Sometimes this wait must be longer -- the unit test engine can really slow things down.
  // !*!*!*!* This is particularly true when running the test in the debugger!
  // !*!*!*!* If this delay isn't long enough for the server's message to be processed, you will get
  // !*!*!*!* errors related to accessing objects on an unloaded AppDomain.
  // !*!*!*!* In real life this woudn't happen -- this is an artifact of unit testing a complex
  // !*!*!*!* multi-threaded process.
  //Thread.Sleep(500);

  // Because we know it works, we could actually do this, which is particularly useful when we're
  // debugging and single stepping through code -- we do not want the test in this AppDomain
  // to exit prematurely!
  while (String.IsNullOrEmpty(received))
  {
    Thread.Sleep(0);
  }

  Assert.That(received == "Hello World", "Expected to receive 'Hello World'");
}

There's a lot going on here:

  1. Create two semantic processors, one for the outbound message, one for the inbound message
  2. On the outbound processor, declare:
    1. the two membranes
    2. the outbound distributed computing receptor
    3. the permeability of the two membranes
  3. On the inbound processor, declare:
    1. the two membranes
    2. the outbound distributed computing receptor
    3. the test receptor that processes the semantic type
    4. the permeability of the two membranes

Behind the scenes are the distributed computing receptors. 

Outbound Distributed Semantic Receptor

This receptor is responsible for serializing the semantic type and posting it to our server:

public class OutboundDistributedComputingReceptor : IReceptor<ISemanticType>
{
  protected int outboundPort;

  public OutboundDistributedComputingReceptor(int outboundPort)
  {
    this.outboundPort = outboundPort;
  }

  public void Process(ISemanticProcessor proc, IMembrane membrane, ISemanticType obj)
  {
    string url = String.Format("http://localhost:{0}/semanticType", outboundPort);
    string json = JsonConvert.SerializeObject(obj);
    // Insert our type name:
    json = "{\"_type_\":\"" + obj.GetType().FullName + "\"," + json.Substring(1);
    HttpWebRequest request = (HttpWebRequest)WebRequest.Create(url);
    request.Method = "POST";
    request.ContentType = "application/json";
    request.ContentLength = json.Length;
    Stream st = request.GetRequestStream();
    byte[] bytes = Encoding.UTF8.GetBytes(json);
    st.Write(bytes, 0, bytes.Length);
    st.Close();
  }
}

We employ a couple tricks:

  1. The receptor receives all semantic type objects, so we rely on membrane filtering to pass through into our inner membrane only the semantic types we want processed remotely.
  2. We inject "_type_" into the JSON so that we know what type to deserialize into on the other side.

Inbound Distributed Semantic Receptor

On the inbound side, we set up the server to listen to the "/semanticType" path and rehydrate the semantic type.  It is posted to our inner membrane on the second semantic processor, and again we rely on membrane filtering to permeate the desired types out to the outer membrane, where our test receptor sits, waiting for the appropriate semantic type:

public class InboundDistributedComputingReceptor : IReceptor
{
  protected SemanticProcessor sp; // the processor for the inbound types.
  protected Server server;
  protected int outboundPort;

  public InboundDistributedComputingReceptor(int inboundPort, SemanticProcessor sp)
  {
    this.sp = sp;

    server = new Server();
    server.OnRequest = (session, context) =>
    {
      session.Authenticated = true;
      session.UpdateLastConnectionTime();
    };

    server.AddRoute(new Route() { Verb = Router.POST, Path = "/semanticType", Handler = new AnonymousRouteHandler(server, ProcessInboundSemanticType) });
    server.Start("", inboundPort);
  }

  protected ResponsePacket ProcessInboundSemanticType(Session session, Dictionary<string, object> parms)
  {
    string json = parms["Data"].ToString();
    JObject jobj = JObject.Parse(json);
    string type = jobj["_type_"].ToString();

    // strip off the _type_ so we can then instantiate the semantic type.
    json = "{" + json.RightOf(',');
    
    // Requires that the namespace also matches the remote's namespace.
    Type ttarget = Type.GetType(type); 
    ISemanticType target = (ISemanticType)Activator.CreateInstance(ttarget);
    JsonConvert.PopulateObject(json, target);
    sp.ProcessInstance<DistributedProcessMembrane>(target);

    ResponsePacket ret = new ResponsePacket() { Data = Encoding.UTF8.GetBytes("OK"), ContentType = "text" };

    return ret;
  }
}

And because it's so amazing, here is the passing integration test:

Conclusion

Type First Development (coined by Tomas Petricek) is applicable to imperative languages as well as functional languages.  We can use the C#'s type system to create rich types and declaratively establish the relationship between types and the methods that process those types.  We can also create containers (membranes) to create computational islands and control the flow of type instances between computational islands.  By using a semantic processor, the membranes, types and receptors that are declared in a "semantic system" becomes a expressive computational unit.  Specialized receptors, such as the distributed receptors illustrated in this article, demonstrate how easy it is to create a distributed semantic computing system.

OK, enough of the hifalutin talk.  This stuff is damn cool!

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Marc Clifton
United States United States
Marc is the creator of two open source projects, MyXaml, a declarative (XML) instantiation engine and the Advanced Unit Testing framework, and Interacx, a commercial n-tier RAD application suite.  Visit his website, www.marcclifton.com, where you will find many of his articles and his blog.

Marc lives in Philmont, NY.

You may also be interested in...

Comments and Discussions

 
GeneralMy vote of 5 Pin
D V L27-Oct-15 1:18
professionalD V L27-Oct-15 1:18 
QuestionI like the name Pin
Cyron437-Aug-15 2:35
memberCyron437-Aug-15 2:35 
QuestionComplicated Pin
FatCatProgrammer27-Jul-15 9:02
memberFatCatProgrammer27-Jul-15 9:02 
AnswerRe: Complicated Pin
Marc Clifton27-Jul-15 9:40
protectorMarc Clifton27-Jul-15 9:40 
GeneralRe: Complicated Pin
FatCatProgrammer28-Jul-15 4:46
memberFatCatProgrammer28-Jul-15 4:46 
GeneralRe: Complicated Pin
Marc Clifton29-Jul-15 4:09
protectorMarc Clifton29-Jul-15 4:09 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Terms of Use | Mobile
Web02 | 2.8.160721.1 | Last Updated 24 Jul 2015
Article Copyright 2015 by Marc Clifton
Everything else Copyright © CodeProject, 1999-2016
Layout: fixed | fluid