Win32 Thread Pools and C++11 : A Quick Wrapper






4.79/5 (12 votes)
Use Windows new ThreadPool through a single C++ 11 class
Part of my multithreading library: https://github.com/WindowsNT/mt
Introduction
Windows has a new Threadpool
API with a somewhat messy interface (see example code). Here is a class that simplifies its usage.
Terminology
- A Work item is a worker thread that can run in the background.
- An IO item is a worker thread that gets notified when I/O in a Handle occurs.
- A Timer item is a worker thread that fires after a time set.
- A Wait item is a worker thead that gets notified when some object is signaled.
- A Cleanup Group is the API way to release all the handles after waiting for them.
Two Modes
template <bool AutoDestruct = true>
class tpool
The class is available in two modes, one whose destruction is handled by the API, and the other whose destruction is handled through the class smart pointers.
- Use with
<true>
when you want to use a Cleanup Group. This lets the API keep track of your objects, and with a simpleJoin()
, you can wait for all of them. - Use with
<false>
when you want to manage the items yourself. This allows you to callJoin()
with the items you want to wait, cancel a specific item, etc.
Class Handle
template <typename T,typename Destruction = destruction_policy<T>>
class handle
This smart pointer manages the items. If you instantiate tpool
with true
, these handles automatically destroy their items, with the aid of template specialization for each type:
template<> class destruction_policy<PTP_POOL>
{ public: static void destruct(PTP_POOL h) { CloseThreadpool(h); } };
template<> class destruction_policy<PTP_WORK>
{ public: static void destruct(PTP_WORK h) { CloseThreadpoolWork(h); } };
template<> class destruction_policy<PTP_WAIT>
{ public: static void destruct(PTP_WAIT h) { CloseThreadpoolWait(h); } };
template<> class destruction_policy<PTP_TIMER>
{ public: static void destruct(PTP_TIMER h) { CloseThreadpoolTimer(h); } };
template<> class destruction_policy<PTP_IO>
{ public: static void destruct(PTP_IO h) { CloseThreadpoolIo(h); } };
template<> class destruction_policy<PTP_CLEANUP_GROUP>
{ public: static void destruct(PTP_CLEANUP_GROUP h) { CloseThreadpoolCleanupGroup(h); } };
The handle implements move/copy semantics, etc. with the aid of a std::shared_ptr
.
tpool::Create
bool Create(unsigned long nmin = 1,unsigned long nmax = 1)
Creates the interfaces needed. You can pass the minimum and maximum number of threads you need.
Item Creation
There are four specializations to create your items:
template <> handle<PTP_WORK>
CreateItem<PTP_WORK,PTP_WORK_CALLBACK>(PTP_WORK_CALLBACK cb,PVOID opt,HANDLE);
template <> handle<PTP_WAIT>
CreateItem<PTP_WAIT,PTP_WAIT_CALLBACK>(PTP_WAIT_CALLBACK cb,PVOID opt,HANDLE);
template <> handle<PTP_TIMER>
CreateItem<PTP_TIMER,PTP_TIMER_CALLBACK>(PTP_TIMER_CALLBACK cb,PVOID opt,HANDLE);
template <> handle<PTP_IO>
CreateItem<PTP_IO,PTP_WIN32_IO_CALLBACK>(PTP_WIN32_IO_CALLBACK cb,PVOID opt,HANDLE hY);
Each call takes the callback to be executed, a parameter to pass to the callback. CreateItem<PTP_IO>
also takes a HANDLE
value for I/O operations.
Item Running
There are three specializations to run your items (A Wait
object does not "run"). With the aid of std::tuple
, we can use the same function signatures with different parameters:
template <> void RunItem<PTP_WORK>(handle<PTP_WORK> h,std::tuple<>);
template <> void RunItem<PTP_TIMER>(handle<PTP_TIMER> h,std::tuple<FILETIME*,DWORD,DWORD>t);
template <> void RunItem<PTP_IO>(handle<PTP_IO> h,std::tuple<bool> t);
RunItem<PTP_TIMER>
also takes the timer parameters (See SetThreadpoolTimer) and RunItem<PTP_IO>
takes a bool
, true
in order to cancel the I/O or false
to start it.
Item Waiting
There are four specializations to wait for your items:
template <> void Wait<PTP_WORK>(handle<PTP_WORK> h,bool Cancel);
template <> void Wait<PTP_WAIT>(handle<PTP_WAIT> h,bool Cancel);
template <> void Wait<PTP_IO>(handle<PTP_IO> h,bool Cancel);
template <> void Wait<PTP_TIMER>(handle<PTP_TIMER> h,bool Cancel);
Passing true
in Cancel
forces the function to kill the object if it hasn't started yet.
Joining
With the aid of type traits, we define the join
function in two ways, depending on if tpool
is instantiated with true
or false
.
template <bool Q = AutoDestruct>
typename std::enable_if<Q,void>::type
Join(bool Cancel = false);
For the case that you have auto destruction, Join
waits for all your current items to finish. If Cancel
is true
, then all items that haven't yet started are cancelled.
template <bool Q = AutoDestruct>
typename std::enable_if<!Q,void>::type
Join(bool Cancel = false,
std::initializer_list<handle<PTP_WORK>> h1 = std::initializer_list<handle<PTP_WORK>>({}),
std::initializer_list<handle<PTP_TIMER>> h2 = std::initializer_list<handle<PTP_TIMER>>({}),
std::initializer_list<handle<PTP_WAIT>> h3 = std::initializer_list<handle<PTP_WAIT>>({}),
std::initializer_list<handle<PTP_IO>> h4 = std::initializer_list<handle<PTP_IO>>({})
);
For the case that you have manual destruction, Join
waits for the items you specify to finish. If Cancel
is true
, then all items that haven't yet started are cancelled.
The Final Code
#include <windows.h>
#include <functional>
#include <memory>
#include <type_traits>
// -------------------------
namespace tpoollib
{
// Handles template
// Destruction Policy
template<typename T>
class destruction_policy
{
public:
static void destruct(T h)
{
static_assert(false,"Must define destructor");
}
};
// Policies Specialization
template<> class destruction_policy<PTP_POOL>
{ public: static void destruct(PTP_POOL h) { CloseThreadpool(h); } };
template<> class destruction_policy<PTP_WORK>
{ public: static void destruct(PTP_WORK h) { CloseThreadpoolWork(h); } };
template<> class destruction_policy<PTP_WAIT>
{ public: static void destruct(PTP_WAIT h) { CloseThreadpoolWait(h); } };
template<> class destruction_policy<PTP_TIMER>
{ public: static void destruct(PTP_TIMER h) { CloseThreadpoolTimer(h); } };
template<> class destruction_policy<PTP_IO>
{ public: static void destruct(PTP_IO h) { CloseThreadpoolIo(h); } };
template<> class destruction_policy<PTP_CLEANUP_GROUP>
{ public: static void destruct(PTP_CLEANUP_GROUP h)
{ CloseThreadpoolCleanupGroup(h); } };
// Template for Handles
template <typename T,typename Destruction = destruction_policy<T>>
class handle
{
private:
T hX = 0;
bool NoDestruct = true;
std::shared_ptr<size_t> ptr = std::make_shared<size_t>();
public:
// Closing items
void Close()
{
if (!ptr || !ptr.unique())
{
ptr.reset();
return;
}
ptr.reset();
if (hX != 0 && !NoDestruct)
Destruction::destruct(hX);
hX = 0;
}
handle()
{
hX = 0;
}
~handle()
{
Close();
}
handle(const handle& h)
{
Dup(h);
}
handle(handle&& h)
{
Move(std::forward<handle>(h));
}
handle(T hY,bool NoDestructOnClose)
{
hX = hY;
NoDestruct = NoDestructOnClose;
}
handle& operator =(const handle& h)
{
Dup(h);
return *this;
}
handle& operator =(handle&& h)
{
Move(std::forward<handle>(h));
return *this;
}
void Dup(const handle& h)
{
Close();
NoDestruct = h.NoDestruct;
hX = h.hX;
ptr = h.ptr;
}
void Move(handle&& h)
{
Close();
hX = h.hX;
ptr = h.ptr;
NoDestruct = h.NoDestruct;
h.ptr.reset();
h.hX = 0;
h.NoDestruct = false;
}
operator T() const
{
return hX;
}
};
template <bool AutoDestruct = true>
class tpool
{
private:
handle<PTP_POOL> p;
handle<PTP_CLEANUP_GROUP> pcg;
TP_CALLBACK_ENVIRON env;
tpool(const tpool&) = delete;
tpool(tpool&&) = delete;
void operator=(const tpool&) = delete;
void operator=(tpool&&) = delete;
public:
tpool()
{
}
~tpool()
{
End();
}
void End()
{
Join();
DestroyThreadpoolEnvironment(&env);
p.Close();
}
// Creates the interfaces
bool Create(unsigned long nmin = 1,unsigned long nmax = 1)
{
bool jauto = AutoDestruct;
// Env
InitializeThreadpoolEnvironment(&env);
// Pool and Min/Max
handle<PTP_POOL> cx(CreateThreadpool(0),false);
p = cx;
if (!p)
{
End();
return false;
}
if (!SetThreadpoolThreadMinimum(p,nmin))
{
End();
return false;
}
SetThreadpoolThreadMaximum(p,nmax);
// Cleanup Group
if (jauto)
{
handle<PTP_CLEANUP_GROUP> cx(CreateThreadpoolCleanupGroup(),false);
pcg = cx;
if (!pcg)
{
End();
return false;
}
}
// Sets
SetThreadpoolCallbackPool(&env,p);
SetThreadpoolCallbackCleanupGroup(&env,pcg,0);
return true;
}
// Templates for each of the items, to be specialized later
template <typename J>
void Wait(handle<J> h,bool Cancel = false)
{
static_assert(false,"No Wait function is defined");
}
template <typename J,typename CB_J>
handle<J> CreateItem(CB_J cb,PVOID opt = 0,HANDLE hX = 0)
{
static_assert(false,"No Create function is defined");
}
template <typename J,typename ...A>
void RunItem(handle<J> h,std::tuple<A...> = std::make_tuple<>())
{
static_assert(false,"No Run function is defined");
}
// Work Stuff
template <> handle<PTP_WORK> CreateItem<PTP_WORK,PTP_WORK_CALLBACK>
(PTP_WORK_CALLBACK cb,PVOID opt,HANDLE)
{
handle<PTP_WORK> a(CreateThreadpoolWork(cb,opt,&env),AutoDestruct);
return a;
}
template <> void RunItem<PTP_WORK>(handle<PTP_WORK> h,std::tuple<>)
{
SubmitThreadpoolWork(h);
}
template <> void Wait<PTP_WORK>(handle<PTP_WORK> h,bool Cancel)
{
WaitForThreadpoolWorkCallbacks(h,Cancel);
}
// Wait stuff
template <> handle<PTP_WAIT> CreateItem<PTP_WAIT,PTP_WAIT_CALLBACK>
(PTP_WAIT_CALLBACK cb,PVOID opt,HANDLE)
{
handle<PTP_WAIT> a(CreateThreadpoolWait(cb,opt,&env),AutoDestruct);
return a;
}
template <> void Wait<PTP_WAIT>(handle<PTP_WAIT> h,bool Cancel)
{
WaitForThreadpoolWaitCallbacks(h,Cancel);
}
// Timer stuff
template <> handle<PTP_TIMER> CreateItem<PTP_TIMER,PTP_TIMER_CALLBACK>
(PTP_TIMER_CALLBACK cb,PVOID opt,HANDLE)
{
handle<PTP_TIMER> a(CreateThreadpoolTimer(cb,opt,&env),AutoDestruct);
return a;
}
template <> void RunItem<PTP_TIMER>(handle<PTP_TIMER> h,
std::tuple<FILETIME*,DWORD,DWORD>t)
{
SetThreadpoolTimer(h,std::get<0>(t),std::get<1>(t),std::get<2>(t));
}
template <> void Wait<PTP_TIMER>(handle<PTP_TIMER> h,bool Cancel)
{
WaitForThreadpoolTimerCallbacks(h,Cancel);
}
// IO Stuff
template <> handle<PTP_IO> CreateItem<PTP_IO,PTP_WIN32_IO_CALLBACK>
(PTP_WIN32_IO_CALLBACK cb,PVOID opt,HANDLE hY)
{
handle<PTP_IO> a(CreateThreadpoolIo(hY,cb,opt,&env),AutoDestruct);
return a;
}
template <> void RunItem<PTP_IO>(handle<PTP_IO> h,std::tuple<bool> t)
{
bool Cancel = std::get<0>(t);
if (Cancel)
CancelThreadpoolIo(h);
else
StartThreadpoolIo(h);
}
template <> void Wait<PTP_IO>(handle<PTP_IO> h,bool Cancel)
{
WaitForThreadpoolIoCallbacks(h,Cancel);
}
// Join functions, one for each option (AutoDestruct or not)
template <bool Q = AutoDestruct>
typename std::enable_if<Q,void>::type
Join(bool Cancel = false)
{
if (pcg)
{
CloseThreadpoolCleanupGroupMembers(pcg,Cancel,0);
pcg.Close();
}
}
template <bool Q = AutoDestruct>
typename std::enable_if<!Q,void>::type
Join(bool Cancel = false,
std::initializer_list<handle<PTP_WORK>> h1 =
std::initializer_list<handle<PTP_WORK>>({}),
std::initializer_list<handle<PTP_TIMER>> h2 =
std::initializer_list<handle<PTP_TIMER>>({}),
std::initializer_list<handle<PTP_WAIT>> h3 =
std::initializer_list<handle<PTP_WAIT>>({}),
std::initializer_list<handle<PTP_IO>> h4 =
std::initializer_list<handle<PTP_IO>>({})
)
{
for (auto a : h1)
Wait<PTP_WORK>(a,Cancel);
for (auto a : h2)
Wait<PTP_TIMER>(a,Cancel);
for (auto a : h3)
Wait<PTP_WAIT>(a,Cancel);
for (auto a : h4)
Wait<PTP_IO>(a,Cancel);
}
};
}
Sample Usage
Error checking is removed for simplicity.
using namespace tpoollib;
int __stdcall WinMain(HINSTANCE, HINSTANCE, LPSTR, int)
{
CoInitializeEx(0, COINIT_APARTMENTTHREADED);
// Auto-Destruction of items
{
tpool<true> t;
t.Create();
auto workit = t.CreateItem<PTP_WORK,PTP_WORK_CALLBACK>
([] (PTP_CALLBACK_INSTANCE,PVOID,PTP_WORK)
{
Sleep(((rand() % 5) + 1) * 1000);
return;
},0);
for (int i = 0; i < 3; i++)
t.RunItem(workit);
t.Join();
}
// Manual Destruction of items
{
tpool<false> t;
t.Create();
auto workit = t.CreateItem<PTP_WORK,PTP_WORK_CALLBACK>
([] (PTP_CALLBACK_INSTANCE,PVOID,PTP_WORK)
{
Sleep(((rand() % 5) + 1) * 1000);
return;
},0);
for (int i = 0; i < 3; i++)
t.RunItem(workit);
t.Join(true,{workit});
}
return 0;
}
History
- 26th July, 2015 - First release