Click here to Skip to main content
Click here to Skip to main content

Tagged as

Go to top

AFP: Almost Functional Programming in C#: Part 2

, 6 Mar 2013
Rate this:
Please Sign up or sign in to vote.
Putting the programming style to the test by using it in a multi-threaded server.

Where is the Functional in Programming?

We left the last article with a promise that we will rewrite that imperative constructor, which we did not like too much, as it was not Functional enough. So the revisited code looks like this, and I promise you that it calculates the exact same result: 

public Portfolio(IEnumerable<Tuple<string, int, string>> stocklist)
{
    _stocks = stocklist.ToDictionary(x => x.Item1, x => Source<Price>());
    var stocksByCurrency = stocklist.GroupBy(x => x.Item3);
    _rates = stocksByCurrency.ToDictionary(x => x.Key, x => Source<double>());
    Result =
        Apply(
            stocksByCurrency.Select(
                x => Function(
                    Apply(
                        x.Select(
                            y => Function(
                                _stocks[y.Item1],
                                Constant(y.Item2),
                                (stock, num) => stock * num)
                        ).Cast<IValue<Price>>().ToList(),
                        (p1, p2) => p1 + p2),
                    _rates[x.Key],
                    (sum, curr) => sum * curr)
                ).Cast<IValue<Price>>().ToList(),
            (p1, p2) => p1 + p2);
} 

While it takes fewer lines, or more precisely we could write the parameters in the same line to get rid of some short lines, it is already quite hard to read this version, we really should not obfuscate it further. If we would remove all the indentation, what I had to introduce by hand by the way, as Visual Studio 2010 is not able to correctly format these monstrous expressions, we could see all the advantages and disadvantages of FP in one single code line. While it would show the real power of Functional Programming, that we can achieve almost anything by chaining together functions after functions, it would show the problems too, that it can become incomprehensible fast. As this happens all the time, we can make it more readable by using more meaningful names for the lambda parameters than x and y, or structuring the processing into smaller functions which have meaningful names. We can also introduce new variable names to hold some intermediate results like stocksByCurrency does, or Select into anonymous types to have meaningful property names rather than Item1 and Item2. Unfortunately C# was not designed as a Functional Programming Language from the start, and the last three improvements have drawbacks.

  1. Splitting processing into smaller functions can be quite tricky because of two reasons. First, it is possible that we simply cannot name it in any meaningful way. It is the same problem we will have naming intermediate result. Can we name a function as ApplyFilterThenGroupByThenAggregate, or a variable as afterFilterAndGroupByAndAggregate? Imperative programming at least gives us some hints what is happening, but when we naturally link 10 functions, sometimes it is not that clear how to name the result of the 5th linking. Of course we can always use a, b, x and y as variable names as a last resort... In case of functions we also have to provide a return value type which sometimes can be quite tricky as well. Can you guess what is wrong with IGrouping<int, IOrderedQueryable<ILookup<string, Order>>>? As long as we keep everything in a simple function, type inference makes wonders, but when we want to put it into a function signature, it can bite us hard.

  2. We can introduce new variable names which hold intermediate results by either using the let keyword or using statement lambdas. The problem of the let keyword is that it simply introduces a new anonymous type, so its drawback will be discussed there. The statement lambdas (the lambdas with curly braces), on the other hand look perfect at first sight. Actually they look exactly like the Portfolio constructor before we transformed it into a functional algorithm. If we have to use curly braces all the time, we may consider not forcing pure Functional Programming, as it is quite possible that the problem domain is not a good match for FP. Yes, there are problems which can be elegantly solved by using FP, and there are other ones which require OOP thinking. But relax, our pension fund is not one of them...

  3. Anonymous types have the problem that they are ugly, and that they are anonymous. We can forgive ugliness but it is a real pain when we cannot define the type as a function parameter or return type, or when we have to define the type parameters of generics because we hit the limitations of type inference.

So there can be problems creeping all along the road, and in the end there seems to be no perfect way to solve these issues. The stark reality is that a Real Functional Programming Language is the one, where all these little puzzle pieces click into each other without resistance and we only have to focus on the problem we want to solve, and not on the underlying infrastructure. If you are interested in one, you can try Clojure anytime. Obviously it will not solve the how-to-name-things problem, that will remain your job forever.

On the other hand, as we seem to be stuck in C# land, the one and only rule is that you should not force Really Pure Functional Programming ever. While the language provides some infrastructure and syntax, and frankly quite good at it compared to other languages, it has its limitations and you should not push its limits. If you can only express your solution to your problem in some imperative way then go with it. There is no FP Police who can fine you for driving too slow. The only criteria that you should never ever break is that you have to write pure functions. It simply means that under no circumstances shall you modify any state which already exists, you always just have to read the input parameters and create a completely new output state from them. In short, that Almost Functional Programming is.

Functional I/O 

Probably you have already guessed that we cannot write I/O using pure functions, as it is all about modifying the state of the universe. Unfortunately C# is not Haskell, where this little problem is solved by using Monads, we have to devise a solution to separate our beloved pure functions and the I/O operations. Fortunately C# is not Haskell, where you cannot create impure functions, we can just call I/O in the good old Object Oriented Programming way, if such thing exists at all, and only need some way to execute the pure functions. If we think about this problem, it is trivial to see that we cannot execute pure functions in response to some I/O, as by definition we should throw away the result of the calculation as we learned last time. So every pure function execution must finish by either storing the result, or doing some I/O, making all invocations to be an Action type. Every such action will depend on at least two types of information, the current packet it is processing, which for example can be the current prices of a stock in a market update, and also some kind of global configuration or global program state.

Handling global program state is a very tricky thing to do, and it is exactly the thing we usually do not care about, we just grow the software and put locks here and there whenever we think that multiple threads can access the same data structure. Sometimes we put a read-write lock there when we realize that update is very rare, or try to create some clever lock-free data structure to hide some unintended threading bugs into the code. But every step on the way we just hope that this time it will succeed, unlike the last time when the number of different locks surpassed our cognitive capabilities, or trashed the scheduler until it ground to a halt. As Rich Hickey, the creator of Clojure said: "Mutable stateful objects are the new spaghetti code", and this is the thing we have to avoid at all costs.

There is a very trivial thing we have to realize if we want to avoid mutating the global program state, and it is that the only thing we can do is to create a new program state, which has the required modifications, and transition all processing from the old state to the new state explicitly. Fortunately creating a copy of the immutable program state is not that expensive as it seems, as we can share most data with the old state, as it is read-only by definition. It is obvious that now that we have more than one state, we cannot use static variables, or god forbid singletons, to hold it. We have to explicitly pass it as a parameter into every function which uses it, and this explicitness is what we have to strive for. Explicit parameter passing is good, as it documents the data flow in our program, and explicit data flow is king in multi-threaded applications. There is no way forgetting to update some shared data, or not passing it in as a parameter, everything happens explicitly, and everything is documented in the function signatures. There is a big difference compared to our usual multi-threaded application code is that we now have to define the transition from the old state to the new state explicitly, while this code is probably completely missing from the old style, object state mutating code. On one hand it is easier not writing code than writing code, but on the other hand if we do not specify the transition explicitly then the system will behave in some accidental way. It is possible that this is what we need, but in the majority of cases it results in unintended behavior and forces defensive programming. For example, if we query some data from a Dictionary, and later query some other data from the same Dictionary with the same key, we cannot be sure whether the second time it will succeed or not, as the Dictionary can be changed while we are running. We have to prepare for the case when something has been changed under our algorithm, and no amount of locking can fix this issue. In case of utilizing immutable program state, this possibility simply cannot happen, we can even hold a reference to the queried data during the processing, as we do not have to re-query it since that would result in the same data anyway.

In our pension fund system the global program data will be the configuration of portfolios, as we will handle more than one of them, and the subscribed market data, which is completely not Functional and depends on the configuration. As every portfolio contains one or more stocks, in order to update to current price of the stocks, we have to subscribe to the data updates. By the way there is no connection between the link and what we will do, I just included it here as an interesting distraction. Since we do not want to pay for the market data, we will generate prices randomly. There are people saying that stock prices are completely made up so let them be right this time. We will however share the subscriptions between portfolios, as we would do in case of real subscriptions. It means that every subscription has one or more portfolios it can affect.

Modifying the expression tree in our Portfolio class would be trivial to handle more than one portfolio, and it would minimize the amount of calculations as well, completely automatically. However to utilize multicore CPUs, we will instead process every portfolio simultaneously on the ThreadPool. Unfortunately market data updates have the property that they have to keep their order, as it is absolutely not irrelevant whether the price goes up or down so we have to invent some clever way to schedule updates. Unfortunately the ThreadPool has the property that it processes queued packets out-of-order on different threads. After thinking of it for a while, we can give up the notion that we are scheduling the information that “here is a new update” and realize that we can schedule the information that “here is the queue which can have the new update”. Once we got rid of our mental shackles, coding is easy: 

public sealed class ThreadPoolQueue<T>
{
    static readonly WaitCallback _callback = (self) => ((ThreadPoolQueue<T>)self).Callback();
    readonly Action<T> _func;
    readonly Queue<T> _queue;
    bool _scheduledOrRunning;

    public ThreadPoolQueue(Action<T> func)
    {
        _func = func;
        _queue = new Queue<T>();
        _scheduledOrRunning = false;
    }

    public void Enqueue(T item)
    {
        bool needQueueWorkItem;
        lock (_queue) {
            _queue.Enqueue(item);
            needQueueWorkItem = !_scheduledOrRunning;
            _scheduledOrRunning = true;
        }
        if (needQueueWorkItem)
            ThreadPool.QueueUserWorkItem(_callback, this);
    }

    void Callback()
    {
        try {
            T item;
            lock (_queue) {
                item =  _queue.Dequeue();
            }
            try {
                _func(item);
            } catch(Exception ex) {
                Console.WriteLine("ThreadPoolQueue callback failed: {0}", ex.Message);
            }
            bool needQueueWorkItem;
            lock (_queue) {
                needQueueWorkItem = _queue.Count > 0;
                _scheduledOrRunning = needQueueWorkItem;
            }
            if (needQueueWorkItem)
                ThreadPool.QueueUserWorkItem(_callback, this);
        } catch {
            _scheduledOrRunning = false;
        }
    }
}

This is the simplified version of the solution, in real code we better remove and process batches of items from _queue instead of one by one, so if one ThreadPoolQueue receives much more items than the others, it will be processed faster as well. It would also help by keeping the accessed data in the CPU data cache as the processing would be more localized. The code is trivial, practically we just protect ourselves from queuing two user work items for the same ThreadPoolQueue by setting _scheduledOrRunning appropriately. If we would process items in batches then we could Enqueue 2 million items per second, when all the CPU time is spent on retrying some lock-free operation in ThreadPool. In case of less contention we can Enqueue more items, so it is considered fast enough. If we have to tune it then we can get rid of the locks and use lock-free algorithms and this can win some minor performance gains. If you need more, then you have to practically re-implement ThreadPool in a better way, good luck with it...

Putting it all together

Now that we have all the necessary infrastructure, it is time to finally write some pure functions to handle our domain specific problems. As we all know, in the Kingdom of Nouns, every Verb must be escorted at all times by a Noun. So we have to invent a Noun for our program, and after careful thought I decided to call it Program. Because I became very tired while coming up with this striking name, I do not want to think about more Nouns, so Program will be a partial class, and I will create files like Startup.cs and LoadConfig.cs to categorize the functions for now.

As all the data will be immutable, the classes holding data will not contain any member functions other that the constructor, and even the constructors will not do any work, just check the parameters and initialize the readonly fields.

It means that the Portfolio constructor will have to be changed as well. Unfortunately it uses the simplified node creating constructs inherited from Model, so Model has to be explicitly specified everywhere. Argh!

void ProcessUpdate(PriceUpdate update)
{
    var state = _currentState;
    var subscription = state.SubscriptionHash[update.StockId];
    subscription.Queue.Enqueue(new Packet(state, subscription, update));
}

We will have only one member field which can be changed in the Program class, it is volatile GlobalState _currentState. We use volatile so that all the packet processing code will pick up the actual value and pass forward. This guarantees that all the processing, which occurs in response for some new data, will run in the same time period. We can visualize it that we divide runtime into periods when there is no change in global state. In order to keep the illusion of frozen time, while the packets are queued we have to remember the time period the packet started to execute in. We also remember the subscription object so we do not have to look it up again in ProcessPacket. We do not have to remember the Program instance though as it is stored in the delegate we passed into the ThreadPoolQueue constructor.

ProcessUpdate is called from FakeConnection, in a real software of course we have to handle a real connection to some real server. What is the same however is that every incoming packet has some id which identifies the object we have to act on, and we have to look it up in some Dictionary. In our case this is the StockId, but in other software it can be an employee id, connection lane id or anything else that can identify one of the multiplexed data streams. What we are doing in this server is to separate processing into immutable program state which is stored in _currentState and has some complex explicit state transitioning code, and the fast path which processes incoming market data and stores state in the incremental calculation framework. As the immutable data is immutable, instead of querying by id everywhere, we can easily resolve all keys into the object references itself in the complex explicit state transitioning code. It has the advantage that it is faster, and we do not have to write defensive code in the fast path, as dereferencing cannot fail if we check during object creation time that no pointers are null. The net effect is that in a correctly written Functional Programming server there should be exactly one Dictionary lookup in the fast path. If you check our code then you can count some extra ones in UpdateStockPrice and UpdateCurrencyRate however. Argh!

Before we dive into the complex explicit state transitioning code, we have to invent one function which can compare two somethings, as the most frequent action we will do is to compare Portfolios, Subscriptions and the list of Portfolios subscribed to the same Subscriptions. Since we foresee that the comparison between two somethings can result in more than the information of whether they are the same or not, instead of comparing by IEquatable<T> or something similar, we categorize the differences so we can express how they differ.

public enum Cmp
{
    Nothing,
    NeedsStop,
    NeedsStart,
    NeedsRestart,
}

In case of Portfolios we can take four actions depending on the difference. If we read a new configuration and there is a portfolio in it which did not exist in the old configuration, then we have to start it, which means that we have to build the incremental calculation AST and subscribe to the required market data. If the portfolio exists only in the old configuration then we have to stop calculating it. In case the portfolio exists in both configurations but differs in something, like the Count of some stocks, or the list of stocks then it has to be restarted, which is stopping, creating a new one and starting. If nothing changed then we do nothing, in that case we have to copy the old Portfolio object into the new global state.

public enum Sub
{
    NeedsSubscribe,
    NeedsUnsubscribe,
    NeedsChange,
} 

To keep things simple, in case of a Subscription we do not compare the list of old and new subscribed Portfolio objects, we recreate this list every time. Subscribe and Unsubscribe works exactly as Start/Stop above.

To handle the difference between these two comparison categories, we create a completely generic function:  

public static List<Tuple<C, T1, T2>> Categorize<C, K, T1, T2>(
    C leftOnly, IEnumerable<T1> leftList, Func<T1, K> getLeftKey,
    C rightOnly, IEnumerable<T2> rightList, Func<T2, K> getRightKey,
    Func<T1, T2, C> getCategory)
{
    var result = new List<Tuple<C, T1, T2>>();
    var leftHash = leftList.ToDictionary(getLeftKey);
    var rightHash = rightList.ToDictionary(getRightKey);
    foreach (var kvl in leftHash) {
        T2 rightValue;
        if(rightHash.TryGetValue(kvl.Key, out rightValue))
            result.Add(Tuple.Create(getCategory(kvl.Value, rightValue), kvl.Value, rightValue));
        else
            result.Add(Tuple.Create(leftOnly, kvl.Value, default(T2)));
    }
    foreach (var kvr in rightHash) {
        if (!leftHash.ContainsKey(kvr.Key))
            result.Add(Tuple.Create(rightOnly, default(T1), kvr.Value));
    }
    return result;
}

This function pairs objects by unique keys, the functions which can extract the keys are specified in getLeftKey and getRightKey. Those parameters also specify K, the type of the key, and they better be matching. We have two element types T1 and T2 as usually we compare the configuration data of the new configuration to the actual running objects in the current global state. The parameters leftOnly and rightOnly are the comparison results when the elements in the list cannot be paired and only one of them exists. In case we can pair then the comparison result is given by getCategory.

Note that this function expects absolutely nothing from the types C, K, T1 and T2, it is completely generic, and we tailor it to the actual domain specific class by passing in functions. Actually we should store it in a useful library as it has nothing to do with market data processing, but then we have to invent a Noun for this Verb. And as I said, I am very tired to decide between Utility or Helper so I just put it into Program. For example we could use a class with fields, instead of an enum, to store some data from the comparison, in order to allow getCategory to define and pass to the caller every information about how to resolve the difference.

Finally comes the ConfigChange function, I hope that you will be able to understand it. I have used the Sequential Temporal Composition design pattern and it resulted in the following code:

public virtual void ConfigChange(XmlConfig newConfig, GlobalState oldState)
{
    var marketDataHash = newConfig.MarketDataList.ToDictionary(x => x.Id);

    var portfolioChange = Categorize(
        Cmp.NeedsStart, newConfig.PortfolioList, x => x.Id,
        Cmp.NeedsStop, oldState.PortfolioHash.Values, x => x.Config.Id,
        (n, o) => HasChanged(n, o) ? Cmp.NeedsRestart : Cmp.Nothing);

    var newPortfolioHash = new Dictionary<string, Portfolio>();
    foreach (var p in portfolioChange) {
        switch (p.Item1) {
            case Cmp.Nothing:
                newPortfolioHash.Add(p.Item3.Config.Id, p.Item3);
                break;
            case Cmp.NeedsStart:
            case Cmp.NeedsRestart:
                newPortfolioHash.Add(p.Item2.Id,
                    CreatePortfolio(p.Item2, marketDataHash));
                break;
            case Cmp.NeedsStop:
                // just leave out the object from the new list
                break;
        }
    }

    var newPortfolioConfigList = portfolioChange.Where(x => x.Item1 != Cmp.NeedsStop).Select(x => x.Item2);
    var newSubscriptionIdList = newPortfolioConfigList.SelectMany(
        x => x.StockList.Select(
            y => new { PortfolioId = x.Id, StockId = y.StockId }).
            Union(x.StockList.Where(y => marketDataHash.ContainsKey(y.StockId)).Select(
            y => new { PortfolioId = x.Id, StockId = marketDataHash[y.StockId].CurrencyId }))).Distinct();
    var portfolioBySubscription = newSubscriptionIdList.GroupBy(x => x.StockId, x => x.PortfolioId);
    var subscribeChange = Categorize(
        Sub.NeedsSubscribe, portfolioBySubscription, x => x.Key,
        Sub.NeedsUnsubscribe, oldState.SubscriptionHash.Values, x => x.Config.Id,
        (n, o) => Sub.NeedsChange);

    var newSubscriptionHash = new Dictionary<string, Subscription>();
    foreach (var s in subscribeChange) {
        switch (s.Item1) {
            case Sub.NeedsSubscribe:
                MarketDataConfig cfg;
                if (!marketDataHash.TryGetValue(s.Item2.Key, out cfg)) {
                    Console.WriteLine("Invalid subscription: {0}", s.Item2.Key);
                } else {
                    newSubscriptionHash.Add(
                        s.Item2.Key,
                        new Subscription(cfg, s.Item2.Select(x => newPortfolioHash[x]).ToList(),
                            new ThreadPoolQueue<Packet>(ProcessPacket)));
                }
                break;
            case Sub.NeedsUnsubscribe:
                // just leave out the object from the new list
                _connection.Unsubscribe(s.Item3.Config.Id);
                break;
            case Sub.NeedsChange:
                newSubscriptionHash.Add(
                    s.Item3.Config.Id,
                    new Subscription(s.Item3.Config, s.Item2.Select(x => newPortfolioHash[x]).ToList(), s.Item3.Queue));
                break;
        }
    }

    _currentState = new GlobalState(newConfig, newSubscriptionHash, newPortfolioHash);

    foreach (var s in subscribeChange.Where(x => x.Item1 == Sub.NeedsSubscribe)) {
        _connection.Subscribe(s.Item2.Key);
    }
}

public static bool HasChanged(PortfolioConfig newConfig, Portfolio oldState)
{
    var difference = Categorize(
        true, newConfig.StockList, x => x.StockId,
        true, oldState.Config.StockList, x => x.StockId,
        (n, o) => n.Count != o.Count);
    return difference.Any(x => x.Item1);
}

What it does is simple. First, it compares the old and the new set of Portfolios into portfolioChange. It utilizes the HasChanged function to check whether all the stocks of the two paired Portfolios are exactly the same. It then copies or creates Portfolio objects into newPortfolioHash. By leaving out the NeedsStop ones, they will not be calculated in the future, effectively stopping them. If they would hold some non-functional resource, like an output file or output connection, then we would have to stop the Portfolio by using something like the _connection.Unsubscribe() call later.

After this, we calculate the list of subscriptions which will be required once the transition to the new state finishes. We have to include all the stocks and their currencies as well, so we query it with SelectMany() and Union(). It is an interesting detail that the types of the two anonymous types are the same, that is why we can Union the two sequences together.

A very important advantage of the explicit transition is that we can create the precise list of subscribed portfolios into portfolioBySubscription, and in this way we can compare the two lists. In an imperative program we would have to write some kind of Subscribe function which would have to keep a list of subscribed Portfolio objects. When we would change the Portfolio objects it would have to replace their references. Of course all these changes would have to be locked, and we would have to decide whether to use simple or read-write locks to handle the fast path and the configuration changes efficiently. The other solution would be storing just the PortfolioId and looking up the Portfolio object for every packet. That would be not only slow, but would require locking and correctly updating the Dictionary as well. With the FP solution we can create the full list holding direct references to the appropriate Portfolio objects, and we can update all this atomically.

After determining the difference between the old and new sets of subscriptions we copy or create the subscriptions to the new configuration. In the process we Unsubscribe from the ones we do not need and then change from the old configuration to the new.

While it looks easy, there are things you should be aware of. First, the references must be updated. So if we have object A (Subscription) holding references to object B (Portfolio), then we cannot always copy object A, even if nothing has changed in the configuration of A, as it would still reference the old B instance if B was updated. That is why we use newPortfolioHash[x] when we copy the old Subscription instance data (at least the config and queue).

The second thing is that this code is not functional at all, so we have to handle ordering perfectly. If the data flow in the program is A->B, and so only A holds a reference to B, then we have to process the transition in the following way:

  1. Create/copy the new B instances
  2. Create/copy the new A instances. If we changed even one B we have a reference to, we have to create a new A.
  3. Stop A
  4. Stop B
  5. Transition
  6. Start B
  7. Start A

I do not want to suggest that writing this state transitioning code is easy, but at least we try. If we do not, then our software will have accidental behavior, good luck working with that. The bright side of this code arrangement is that you can now count the number of locks in our little sample. Good, is not it?

Final words 

I hope that I have demonstrated the advantages of Functional Programming in multi-threaded software, and also gave some insight into the reason we have to write the explicit state transitioning code. Remember, in order to be a successful Functional Programmer, you have to do the following three things:

  1. Be handsome! 
  2. Be attractive! 
  3. Do not be unattractive! 

Okay, there are more ways to become successful so just look at the code I have just shown you. I hope that you did not like the Sequential Temporal Composition design pattern. While it has it uses, and you should consider using it whenever there is a sequence of calculations which come in a chronological order, it resulted in the monstrous ConfigChange function which is barely understandable. How can we test this monster?

In the final part we will try making ConfigChange easier to grasp, maybe refactor it into smaller functions, while we bump into the limitations of C# Functional Programming. Ehh, let's just pretend that I did not say that and then you will find it completely unexpected. We can fix the code not to have the extra Dictionary lookups. We will also look at whether this concrete design is SOLID enough or not. Solid concrete, get is? We will also extend the code to see whether this concrete design is flexible enough or not. Flexible concrete, hilarious... I will tell you why we write virtual public functions for a class which has only one instance. And there will be some conclusion as well. 

Hmm, maybe I will be split it into two final parts. In the meantime you can find the two hidden design flaws left in the code...

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Andrew Rafas
Software Developer (Senior)
United Kingdom United Kingdom
I am a senior software developer with almost 20 years of experience. I have extensive knowledge of C#, C++ and Assembly languages, working mainly on Windows and embedded systems. Outside of work I am interested in a wider variety of technologies, including learning 20 programming languages, developing Linux kernel drivers or advocating Functional Programming recently.

Comments and Discussions

 
-- There are no messages in this forum --
| Advertise | Privacy | Mobile
Web01 | 2.8.140926.1 | Last Updated 6 Mar 2013
Article Copyright 2013 by Andrew Rafas
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid