Click here to Skip to main content
13,800,290 members
Click here to Skip to main content
Add your own
alternative version

Stats

10.8K views
23 bookmarked
Posted 25 Jul 2015
Licenced CPOL

Win32 Thread Pools and C++11 : A quick wrapper

, 25 Jul 2015
Rate this:
Please Sign up or sign in to vote.
Use Windows new ThreadPool through a single C++ 11 class.

Introduction

Windows has a new Threadpool API with a somewhat messy interface (See example code). Here is a class that simplifies its usage.

Terminology

  • A Work item is a worker thread that can run in the background.
  • An IO item is a worker thread that gets notified when I/O in a Handle occurs.
  • A Timer item is a worker thread that fires after a time set.
  • A Wait item is a worker thead that gets notified when some object is signaled.
  • Cleanup Group is the API way to release all the handles after waiting for them.

Two modes

template <bool AutoDestruct = true>
class tpool

The class is available in two modes, one that the destruction is handled by the API, and one with the destruction is handled through the class smart pointers.

  • Use with <true> when you want to use a Cleanup Group. This lets the API keep a track of your objects, and with a simple Join() you can wait for all of them.
  • Use with <false> when you want to manage the items yourself. This allows you to call Join() with the items you want to wait, cancel a specific item etc.

 

Class handle

template <typename T,typename Destruction = destruction_policy<T>>
class handle

This smart pointer manages the items. If you instantiate tpool with true, these handles automatically destroy their items, with the aid of template specialization for each type:

template<>    class destruction_policy<PTP_POOL> { public: static void destruct(PTP_POOL h) { CloseThreadpool(h); } };
template<>    class destruction_policy<PTP_WORK> { public: static void destruct(PTP_WORK h) { CloseThreadpoolWork(h); } };
template<>    class destruction_policy<PTP_WAIT> { public: static void destruct(PTP_WAIT h) { CloseThreadpoolWait(h); } };
template<>    class destruction_policy<PTP_TIMER> { public: static void destruct(PTP_TIMER h) { CloseThreadpoolTimer(h); } };
template<>    class destruction_policy<PTP_IO> { public: static void destruct(PTP_IO h) { CloseThreadpoolIo(h); } };
template<>    class destruction_policy<PTP_CLEANUP_GROUP> { public: static void destruct(PTP_CLEANUP_GROUP h) { CloseThreadpoolCleanupGroup(h); } };

The handle implements move/copy semantics etc with the aid of a std::shared_ptr.

 

tpool::Create

bool Create(unsigned long nmin = 1,unsigned long nmax = 1)

Creates the interfaces needed. You can pass the minimum and maximum number of threads you need.

Item Creation

There are four specializations to create your items:

template <> handle<PTP_WORK> CreateItem<PTP_WORK,PTP_WORK_CALLBACK>(PTP_WORK_CALLBACK cb,PVOID opt,HANDLE);
template <> handle<PTP_WAIT> CreateItem<PTP_WAIT,PTP_WAIT_CALLBACK>(PTP_WAIT_CALLBACK cb,PVOID opt,HANDLE);
template <> handle<PTP_TIMER> CreateItem<PTP_TIMER,PTP_TIMER_CALLBACK>(PTP_TIMER_CALLBACK cb,PVOID opt,HANDLE);
template <> handle<PTP_IO> CreateItem<PTP_IO,PTP_WIN32_IO_CALLBACK>(PTP_WIN32_IO_CALLBACK cb,PVOID opt,HANDLE hY);

Each call takes the callback to be executed, a parameter to pass to the callback. CreateItem<PTP_IO> also takes a HANDLE value for I/O operations.

 

Item Running

There are three specializations to run your items (A Wait object does not "run"). With the aid of std::tuple we can use the same function signatures with different parameters:

template <> void RunItem<PTP_WORK>(handle<PTP_WORK> h,std::tuple<>);
template <> void RunItem<PTP_TIMER>(handle<PTP_TIMER> h,std::tuple<FILETIME*,DWORD,DWORD>t);
template <> void RunItem<PTP_IO>(handle<PTP_IO> h,std::tuple<bool> t);

 

RunItem<PTP_TIMER> also takes the timer parameters (See SetThreadpoolTimer) and RunItem<PTP_IO> takes a bool, true in order to cancle the I/O or false to start it.

 

 

 

Item Waiting

There are four specializations to wait for your items:

template <> void Wait<PTP_WORK>(handle<PTP_WORK> h,bool Cancel);
template <> void Wait<PTP_WAIT>(handle<PTP_WAIT> h,bool Cancel);
template <> void Wait<PTP_IO>(handle<PTP_IO> h,bool Cancel);
template <> void Wait<PTP_TIMER>(handle<PTP_TIMER> h,bool Cancel);

Passing true in Cancel forces the function to kill the object if it hasn't started yet.

 

Joining

With the aid of type traits, we define the join function in two ways, depending on if tpool is instantiated with true or false.

template <bool Q = AutoDestruct>
typename std::enable_if<Q,void>::type
Join(bool Cancel = false);

For the case that you have auto destruction, Join waits for all your current items to finish. If  Cancel is true then all items that haven't yet started are cancelled.

            template <bool Q = AutoDestruct>
            typename std::enable_if<!Q,void>::type
                Join(bool Cancel = false,
                std::initializer_list<handle<PTP_WORK>> h1 = std::initializer_list<handle<PTP_WORK>>({}),
                std::initializer_list<handle<PTP_TIMER>> h2 = std::initializer_list<handle<PTP_TIMER>>({}),
                std::initializer_list<handle<PTP_WAIT>> h3 = std::initializer_list<handle<PTP_WAIT>>({}),
                std::initializer_list<handle<PTP_IO>> h4 = std::initializer_list<handle<PTP_IO>>({})
                );

For the case that you have manual destruction, Join waits for the items you specify to finish. If  Cancel is true then all items that haven't yet started are cancelled.

 

The final code

#include <windows.h>
#include <functional>
#include <memory>
#include <type_traits>

// -------------------------
namespace tpoollib
    {
    // Handles template
    
    // Destruction Policy    
    template<typename T>
    class destruction_policy
        {
        public:
            static void destruct(T h)
                {
                static_assert(false,"Must define destructor");
                }
        };

    // Policies Specialization
    template<>    class destruction_policy<PTP_POOL> { public: static void destruct(PTP_POOL h) { CloseThreadpool(h); } };
    template<>    class destruction_policy<PTP_WORK> { public: static void destruct(PTP_WORK h) { CloseThreadpoolWork(h); } };
    template<>    class destruction_policy<PTP_WAIT> { public: static void destruct(PTP_WAIT h) { CloseThreadpoolWait(h); } };
    template<>    class destruction_policy<PTP_TIMER> { public: static void destruct(PTP_TIMER h) { CloseThreadpoolTimer(h); } };
    template<>    class destruction_policy<PTP_IO> { public: static void destruct(PTP_IO h) { CloseThreadpoolIo(h); } };
    template<>    class destruction_policy<PTP_CLEANUP_GROUP> { public: static void destruct(PTP_CLEANUP_GROUP h) { CloseThreadpoolCleanupGroup(h); } };
    

    // Template for Handles
    template <typename T,typename Destruction = destruction_policy<T>>
    class handle
        {
        private:
            T hX = 0;
            bool NoDestruct = true;
            std::shared_ptr<size_t> ptr = std::make_shared<size_t>();

        public:

            // Closing items
            void Close()
                {
                if (!ptr || !ptr.unique())
                    {
                    ptr.reset();
                    return;
                    }
                ptr.reset();
                if (hX != 0 && !NoDestruct)
                    Destruction::destruct(hX);
                hX = 0;
                }

            handle()
                {
                hX = 0;
                }
            ~handle()
                {
                Close();
                }
            handle(const handle& h)
                {
                Dup(h);
                }
            handle(handle&& h)
                {
                Move(std::forward<handle>(h));
                }
            handle(T hY,bool NoDestructOnClose)
                {
                hX = hY;
                NoDestruct = NoDestructOnClose;
                }

            handle& operator =(const handle& h)
                {
                Dup(h);
                return *this;
                }
            handle& operator =(handle&& h)
                {
                Move(std::forward<handle>(h));
                return *this;
                }

            void Dup(const handle& h)
                {
                Close();
                NoDestruct = h.NoDestruct;
                hX = h.hX;
                ptr = h.ptr;
                }
            void Move(handle&& h)
                {
                Close();
                hX = h.hX;
                ptr = h.ptr;
                NoDestruct = h.NoDestruct;
                h.ptr.reset();
                h.hX = 0;
                h.NoDestruct = false;
                }
            operator T() const
                {
                return hX;
                }

        };

    template <bool AutoDestruct = true>
    class tpool
        {
        private:
            handle<PTP_POOL> p;
            handle<PTP_CLEANUP_GROUP> pcg;
            TP_CALLBACK_ENVIRON env;

            tpool(const tpool&) = delete;
            tpool(tpool&&) = delete;
            void operator=(const tpool&) = delete;
            void operator=(tpool&&) = delete;

        public:

            tpool()
                {
                }

            ~tpool()
                {
                End();
                }

            void End()
                {
                Join();
                DestroyThreadpoolEnvironment(&env);
                p.Close();
                }

            // Creates the interfaces
            bool Create(unsigned long nmin = 1,unsigned long nmax = 1)
                { 
                bool jauto = AutoDestruct;

                // Env
                InitializeThreadpoolEnvironment(&env);

                // Pool and Min/Max
                handle<PTP_POOL> cx(CreateThreadpool(0),false);
                p = cx;
                if (!p)
                    {
                    End();
                    return false;
                    }
                if (!SetThreadpoolThreadMinimum(p,nmin))
                    {
                    End();
                    return false;
                    }
                SetThreadpoolThreadMaximum(p,nmax);

                // Cleanup Group
                if (jauto)
                    {
                    handle<PTP_CLEANUP_GROUP> cx(CreateThreadpoolCleanupGroup(),false);
                    pcg = cx;
                    if (!pcg)
                        {
                        End();
                        return false;
                        }
                    }

                // Sets
                SetThreadpoolCallbackPool(&env,p);
                SetThreadpoolCallbackCleanupGroup(&env,pcg,0);

                return true;
                }

            // Templates for each of the items, to be specialized later
            template <typename J>
            void Wait(handle<J> h,bool Cancel = false)
                {
                static_assert(false,"No Wait function is defined");
                }
            template <typename J,typename CB_J>
            handle<J> CreateItem(CB_J cb,PVOID opt = 0,HANDLE hX = 0)
                {
                static_assert(false,"No Create function is defined");
                }
            template <typename J,typename ...A>
            void RunItem(handle<J> h,std::tuple<A...> = std::make_tuple<>())
                {
                static_assert(false,"No Run function is defined");
                }

            // Work Stuff
            template <> handle<PTP_WORK> CreateItem<PTP_WORK,PTP_WORK_CALLBACK>(PTP_WORK_CALLBACK cb,PVOID opt,HANDLE)
                {
                handle<PTP_WORK> a(CreateThreadpoolWork(cb,opt,&env),AutoDestruct);
                return a;
                }
            template <> void RunItem<PTP_WORK>(handle<PTP_WORK> h,std::tuple<>)
                {
                SubmitThreadpoolWork(h);
                }
            template <> void Wait<PTP_WORK>(handle<PTP_WORK> h,bool Cancel)
                {
                WaitForThreadpoolWorkCallbacks(h,Cancel);
                }

            // Wait  stuff
            template <> handle<PTP_WAIT> CreateItem<PTP_WAIT,PTP_WAIT_CALLBACK>(PTP_WAIT_CALLBACK cb,PVOID opt,HANDLE)
                {
                handle<PTP_WAIT> a(CreateThreadpoolWait(cb,opt,&env),AutoDestruct);
                return a;
                }
            template <> void Wait<PTP_WAIT>(handle<PTP_WAIT> h,bool Cancel)
                {
                WaitForThreadpoolWaitCallbacks(h,Cancel);
                }

            // Timer stuff
            template <> handle<PTP_TIMER> CreateItem<PTP_TIMER,PTP_TIMER_CALLBACK>(PTP_TIMER_CALLBACK cb,PVOID opt,HANDLE)
                {
                handle<PTP_TIMER> a(CreateThreadpoolTimer(cb,opt,&env),AutoDestruct);
                return a;
                }
            template <> void RunItem<PTP_TIMER>(handle<PTP_TIMER> h,std::tuple<FILETIME*,DWORD,DWORD>t)
                {
                SetThreadpoolTimer(h,std::get<0>(t),std::get<1>(t),std::get<2>(t));
                }
            template <> void Wait<PTP_TIMER>(handle<PTP_TIMER> h,bool Cancel)
                {
                WaitForThreadpoolTimerCallbacks(h,Cancel);
                }

            // IO Stuff
            template <> handle<PTP_IO> CreateItem<PTP_IO,PTP_WIN32_IO_CALLBACK>(PTP_WIN32_IO_CALLBACK cb,PVOID opt,HANDLE hY)
                {
                handle<PTP_IO> a(CreateThreadpoolIo(hY,cb,opt,&env),AutoDestruct);
                return a;
                }
            template <> void RunItem<PTP_IO>(handle<PTP_IO> h,std::tuple<bool> t)
                {
                bool Cancel = std::get<0>(t);
                if (Cancel)
                    CancelThreadpoolIo(h);
                else
                    StartThreadpoolIo(h);
                }
            template <> void Wait<PTP_IO>(handle<PTP_IO> h,bool Cancel)
                {
                WaitForThreadpoolIoCallbacks(h,Cancel);
                }

            // Join functions, one for each option (AutoDestruct or not)
            template <bool Q = AutoDestruct>
            typename std::enable_if<Q,void>::type
            Join(bool Cancel = false)
                {
                if (pcg)
                    {
                    CloseThreadpoolCleanupGroupMembers(pcg,Cancel,0);
                    pcg.Close();
                    }
                }

            template <bool Q = AutoDestruct>
            typename std::enable_if<!Q,void>::type
                Join(bool Cancel = false,
                std::initializer_list<handle<PTP_WORK>> h1 = std::initializer_list<handle<PTP_WORK>>({}),
                std::initializer_list<handle<PTP_TIMER>> h2 = std::initializer_list<handle<PTP_TIMER>>({}),
                std::initializer_list<handle<PTP_WAIT>> h3 = std::initializer_list<handle<PTP_WAIT>>({}),
                std::initializer_list<handle<PTP_IO>> h4 = std::initializer_list<handle<PTP_IO>>({})
                )
                {
                for (auto a : h1)
                    Wait<PTP_WORK>(a,Cancel);
                for (auto a : h2)
                    Wait<PTP_TIMER>(a,Cancel);
                for (auto a : h3)
                    Wait<PTP_WAIT>(a,Cancel);
                for (auto a : h4)
                    Wait<PTP_IO>(a,Cancel);
                }

        };

    }

 

Sample Usage

Error checking is removed for simplicity.

using namespace tpoollib;
int __stdcall WinMain(HINSTANCE, HINSTANCE, LPSTR, int)
    {
    CoInitializeEx(0, COINIT_APARTMENTTHREADED);
    

    // Auto-Destruction of items
        {
        tpool<true> t;
        t.Create();
        auto workit = t.CreateItem<PTP_WORK,PTP_WORK_CALLBACK>([] (PTP_CALLBACK_INSTANCE,PVOID,PTP_WORK)
            {
            Sleep(((rand() % 5) + 1) * 1000);
            return;
            },0);
        for (int i = 0; i < 3; i++)
            t.RunItem(workit);
        t.Join();
        }

    // Manual Destruction of items
        {
        tpool<false> t;
        t.Create();
        auto workit = t.CreateItem<PTP_WORK,PTP_WORK_CALLBACK>([] (PTP_CALLBACK_INSTANCE,PVOID,PTP_WORK)
            {
            Sleep(((rand() % 5) + 1) * 1000);
            return;
            },0);
        for (int i = 0; i < 3; i++)
            t.RunItem(workit);
        t.Join(true,{workit});
        }

    return 0;
}

 

History

  • 26 JUL 2015 - First release.

 

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Michael Chourdakis
Engineer
Greece Greece
I'm working in C++, PHP , Java, Windows, iOS and Android.

I 've a PhD in Digital Signal Processing and Artificial Intelligence and I specialize in Pro Audio and AI applications.

My home page: http://www.michaelchourdakis.com

You may also be interested in...

Comments and Discussions

 
-- There are no messages in this forum --
Permalink | Advertise | Privacy | Cookies | Terms of Use | Mobile
Web03 | 2.8.181214.1 | Last Updated 26 Jul 2015
Article Copyright 2015 by Michael Chourdakis
Everything else Copyright © CodeProject, 1999-2018
Layout: fixed | fluid