Click here to Skip to main content
13,900,697 members
Click here to Skip to main content
Add your own
alternative version

Tagged as

Stats

1.1K views
Posted 15 Mar 2019
Licenced MIT

Simple Thread Pool

, 15 Mar 2019
Rate this:
Please Sign up or sign in to vote.
Simple thread pool

I know the topic of thread pools has been beaten to death on the internet, nevertheless I wanted to present to you my implementation which uses only standard C++ components. 🙂

I will be using queue and semaphore classes discussed in my earlier posts.

Below, you will find a simple thread pool implementation which can be parametrized by the number of worker threads and the blocking queue depth of work items. Each thread waits on a blocking_queue::pop() until a work item shows up. The threads pick up work items randomly off of the queue, execute them, then go back to blocking_queue::pop(). Destruction and cleanup of threads is done with nullptr sentinel pushed onto the queue. If a sentinel is popped off the queue, the thread will push it back and break out of its work loop. This way, all threads are waited on and allowed to finish all unprocessed work items during destruction of a pool instance.
Moreover, a work item can be any callable entity: lambda, functor, or a function pointer. Work item can accept any number of parameters, thanks to template parameter pack of pool::enqueue_work().

Update

Thank you reddit user sumo952 for bringing to my attention the progschj/ThreadPool. I have updated my implementation to support futures and the ability to retrieve work item’s result.

Usage Example

#include <iostream>
#include <mutex>
#include <cstdlib>
#include "pool.h"
using namespace std;

mutex cout_lock;
#define trace(x) { scoped_lock<mutex> lock(cout_lock); cout << x << endl; }

const int COUNT = thread::hardware_concurrency();
const int WORK = 10'000'000;

int main(int argc, char** argv)
{
    srand((unsigned int)time(NULL));

    thread_pool pool;

    auto result = pool.enqueue_task([](int i) { return i; }, 0xFF);
    result.get();

    for(int i = 1; i <= COUNT; ++i)
        pool.enqueue_work([](int workerNumber) {
            int workOutput = 0;
            int work = WORK + (rand() % (WORK));
            trace("work item " << workerNumber << " starting " << work << " iterations...");
            for(int w = 0; w < work; ++w)
                workOutput += rand();
            trace("work item " << workerNumber << " finished");
        }, i);

    return 1;
}

Program output:

work item 1 starting 170521507 iterations…
work item 2 starting 141859716 iterations…
work item 2 finished
work item 3 starting 189442810 iterations…
work item 1 finished
work item 4 starting 125609749 iterations…
work item 4 finished
work item 3 finished
Program ended with exit code: 1

pool.h

#pragma once

#include <vector>
#include <thread>
#include <memory>
#include <future>
#include <functional>
#include <type_traits>
#include <cassert>
#include "queue.h"

class thread_pool
{
public:
    thread_pool(
        unsigned int queueDepth = std::thread::hardware_concurrency(),
        size_t threads = std::thread::hardware_concurrency())
    : m_workQueue(queueDepth)
    {
        assert(queueDepth != 0);
        assert(threads != 0);
        for(size_t i = 0; i < threads; ++i)
            m_threads.emplace_back(std::thread([this]() {
                while(true)
                {
                    auto workItem = m_workQueue.pop();
                    if(workItem == nullptr)
                    {
                        m_workQueue.push(nullptr);
                        break;
                    }
                    workItem();
                }
            }));
    }

    ~thread_pool() noexcept
    {
        m_workQueue.push(nullptr);
        for(auto& thread : m_threads)
            thread.join();
    }

    using Proc = std::function<void(void)>;

    template<typename F, typename... Args>
    void enqueue_work(F&& f, Args&&... args) 
       noexcept(std::is_nothrow_invocable<decltype(&blocking_queue<Proc>::push<Proc&&>)>::value)
    {
        m_workQueue.push([=]() { f(args...); });
    }

    template<typename F, typename... Args>
    auto enqueue_task(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>
    {
        using return_type = typename std::result_of<F(Args...)>::type;
        auto task = std::make_shared<std::packaged_task<return_type()>>
                  (std::bind(std::forward<F>(f), std::forward<Args>(args)...));
        std::future<return_type> res = task->get_future();
        m_workQueue.push([task](){ (*task)(); });
        return res;
    }

private:
    using ThreadPool = std::vector<std::thread>;
    ThreadPool m_threads;
    blocking_queue<Proc> m_workQueue;
};

License

This article, along with any associated source code and files, is licensed under The MIT License

Share

About the Author

Martin Vorbrodt
Software Developer (Senior)
United States United States
No Biography provided

You may also be interested in...

Comments and Discussions

 
-- There are no messages in this forum --
Permalink | Advertise | Privacy | Cookies | Terms of Use | Mobile
Web02 | 2.8.190306.1 | Last Updated 15 Mar 2019
Article Copyright 2019 by Martin Vorbrodt
Everything else Copyright © CodeProject, 1999-2019
Layout: fixed | fluid