Click here to Skip to main content
13,900,697 members
Click here to Skip to main content
Add your own
alternative version

Tagged as

Stats

911 views
Posted 14 Mar 2019
Licenced CPOL

Advanced Thread Pool

, 14 Mar 2019
Rate this:
Please Sign up or sign in to vote.
Advanced thread pool

Below is my implementation of the thread pool described in this talk and a benchmark comparing it against my simple thread pool implementation. The advanced pool is 15x faster at scheduling and dispatching short random length work items on my 2018 MacBook Pro with i5 CPU and 4 logical cores. It uses a queue per worker thread and a work stealing dispatcher. It tries to enqueue the work items onto a queue that is not currently locked by a dispatch thread. It also tries to steal work from other unblocked queues. As always, the complete implementation is available at GitHub.

Benchmark Program

#include <iostream>
#include <chrono>
#include <cstdlib>
#include "pool.h"
using namespace std;
using namespace chrono;

const unsigned int COUNT = 10'000'000;
const unsigned int REPS = 10;

int main()
{
	srand(0);
	auto start = high_resolution_clock::now();
	{
		simple_thread_pool tp;
		for(int i = 0; i < COUNT; ++i)
			tp.enqueue_work([i]() {
				int x;
				int reps = REPS + (REPS * (rand() % 5));
				for(int n = 0; n < reps; ++n)
					x = i + rand();
			});
	}
	auto end = high_resolution_clock::now();
	auto duration = duration_cast<milliseconds>(end - start);
	cout << "simple_thread_pool duration = " << duration.count() / 1000.f << " s" << endl;

	srand(0);
	start = high_resolution_clock::now();
	{
		thread_pool tp;
		for(int i = 0; i < COUNT; ++i)
			tp.enqueue_work([i]() {
				int x;
				int reps = REPS + (REPS * (rand() % 5));
				for(int n = 0; n < reps; ++n)
					x = i + rand();
			});
	}
	end = high_resolution_clock::now();
	duration = duration_cast<milliseconds>(end - start);
	cout << "thread_pool duration = " << duration.count() / 1000.f << " s" << endl;
}

Program output:

* Apple CLANG -Ofast -march=native -std=c++17 -lc++

simple_thread_pool duration = 30.337 s
thread_pool duration = 1.625 s

* LLVM -Ofast -march=native -std=c++17 -lc++

simple_thread_pool duration = 25.785 s
thread_pool duration = 1.615 s

* G++ -Ofast -march=native -std=c++17 -lstdc++ *

simple_thread_pool duration = 26.28 s
thread_pool duration = 1.614 s

thread_pool Class

class thread_pool
{
public:
	thread_pool(unsigned int threads = std::thread::hardware_concurrency())
	: m_queues(threads), m_count(threads)
	{
		assert(threads != 0);
		auto worker = [&](unsigned int i)
		{
			while(true)
			{
				Proc f;
				for(unsigned int n = 0; n < m_count; ++n)
					if(m_queues[(i + n) % m_count].try_pop(f)) break;
				if(!f && !m_queues[i].pop(f)) break;
				f();
			}
		};
		for(unsigned int i = 0; i < threads; ++i)
			m_threads.emplace_back(worker, i);
	}

	~thread_pool() noexcept
	{
		for(auto& queue : m_queues)
			queue.done();
		for(auto& thread : m_threads)
			thread.join();
	}

	template<typename F, typename... Args>
	void enqueue_work(F&& f, Args&&... args)
	{
		auto work = [f,args...]() { f(args...); };
		unsigned int i = m_index++;
		for(unsigned int n = 0; n < m_count * K; ++n)
			if(m_queues[(i + n) % m_count].try_push(work)) return;
		m_queues[i % m_count].push(work);
	}

	template<typename F, typename... Args>
	auto enqueue_task(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>
	{
		using return_type = typename std::result_of<F(Args...)>::type;
		auto task = std::make_shared<std::packaged_task<return_type()>>
                         (std::bind(std::forward<F>(f), std::forward<Args>(args)...));
		std::future<return_type> res = task->get_future();

		auto work = [task](){ (*task)(); };
		unsigned int i = m_index++;
		for(unsigned int n = 0; n < m_count * K; ++n)
			if(m_queues[(i + n) % m_count].try_push(work)) return res;
		m_queues[i % m_count].push(work);

		return res;
	}

private:
	using Proc = std::function<void(void)>;
	using Queues = std::vector<simple_blocking_queue<Proc>>;
	Queues m_queues;

	using Threads = std::vector<std::thread>;
	Threads m_threads;

	const unsigned int m_count;
	std::atomic_uint m_index = 0;

	inline static const unsigned int K = 3;
};

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Martin Vorbrodt
Software Developer (Senior)
United States United States
No Biography provided

You may also be interested in...

Comments and Discussions

 
-- There are no messages in this forum --
Permalink | Advertise | Privacy | Cookies | Terms of Use | Mobile
Web03 | 2.8.190306.1 | Last Updated 15 Mar 2019
Article Copyright 2019 by Martin Vorbrodt
Everything else Copyright © CodeProject, 1999-2019
Layout: fixed | fluid