C++ thread pool

1064

Thread pool is a well-known design pattern for tasks execution in a multithread environment. Today we're not going to reinvent the whell and write a thread pool in C++ from scratch. Instead, we're going to take an existing implementation of a thread pool and enhance it with additional features.

Boost.Asio thread pool

There is a very basic thread pool implementation in Boost.Asio library. All you need is to include proper headers and you're all set:

#include <iostream>
#include <mutex>
#include <boost/asio/post.hpp>
#include <boost/asio/thread_pool.hpp>

std::mutex output_mutex;

int main()
{
    boost::asio::thread_pool workers(16);

    for(int n = 0; n < 16; ++n)
    {
        boost::asio::post(workers, [=]
        {
            auto guard = std::lock_guard(output_mutex);
            std::cout << "Hello from task " << n << "!\n";
        });
    }

    workers.wait();

    return 0;
}

boost::asio::thread_pool::wait member function was implemented in Boost version 1.74. If you're using earlier version of Boost library then you can replace wait with join.

So, let's start writing our enhanced thread pool in C++ on top of Boost.Asio thread pool. Assume we put our thread_pool class into thread_pool.hpp header:

#include <boost/asio/post.hpp>
#include <boost/asio/thread_pool.hpp>

class thread_pool
{
public:

    thread_pool(std::size_t threads)
    : workers(threads)
    {
    }

    template <typename F>
    void post(F&& f)
    {
        boost::asio::post(workers, std::forward<F>(f));
    }

    void wait()
    {
        workers.wait();
    }

private:

    boost::asio::thread_pool workers;
};

This is a start. Now, let's add some features.

Task's return value

As you can see, post function just schedules a given task for the execution, and no feedback is provided. So, the first thing we could want to add is some feedback. There are different ways to obtain task's return value, and the most simple one is to use futures and promises. Sometimes you don't need any return value or completion indicator, so we'll leave post function as it is for such cases. For dealing with return values we will add submit function which provides a feedback:

#include <future>
#include <boost/asio/post.hpp>
#include <boost/asio/thread_pool.hpp>

class thread_pool
{
public:

    // ...

    template <typename F>
    void post(F&& f)
    {
        boost::asio::post(workers, std::forward<F>(f));
    }

    template <typename F>
    auto submit(F&& f) -> std::future<decltype(f())>
    {
        std::promise<decltype(f())> promise;
        auto future = promise.get_future();
        boost::asio::post(workers, [promise = std::move(promise), f = std::forward<F>(f)] () mutable
        {
            promise.set_value(f());
        });
        return future;
    }

    // ...
};

Now we can use std::future to get asynchronous result of a task scheduled:

#include <iostream>
#include "thread_pool.hpp"

int main()
{
    thread_pool workers(16);

    // val is std::future<char const*>
    auto val = workers.submit([]
    {
        return "Hello from the thread pool!";
    });

    std::cout << val.get() << "\n";

    return 0;
}

Tasks queue size limit

Sometimes you need to limit the size of the tasks queue. For example, when tasks acquire some expensive resource or just allocate a lot of memory. A very simple approach is to use a semaphore. If you use C++20 then you can utilize std::counting_semaphore for that. If you're still on some earlier version of C++ and need a cross-platform solution, then you can use a semaphore from Boost.Interprocess library. We'll go with the second option. Anyway, they're easily replaceable by each other. First, let's put a semaphore into the private section of our class:

private:

    boost::asio::thread_pool workers;
    boost::interprocess::interprocess_semaphore queue;

Next, initialize it with a given queue size limit:

thread_pool(std::size_t threads, std::size_t capacity)
: workers(threads)
, queue  (capacity)
{
}

And the last — apply semaphore to limit the queue size:

template <typename F>
void post(F&& f)
{
    queue.wait();

    boost::asio::post(workers, [this, f = std::forward<F>(f)]
    {
        f();
        queue.post();
    });
}

template <typename F>
auto submit(F&& f) -> std::future<decltype(f())>
{
    queue.wait();

    std::promise<decltype(f())> promise;
    auto future = promise.get_future();
    boost::asio::post(workers, [promise = std::move(promise), f = std::forward<F>(f)] () mutable
    {
        promise.set_value(f());
        queue.post();
    });
    return future;
}

And here how does it work: everytime you schedule a task, semaphore counter decrements. Once it reaches zero, the next post or submit will block until some task has been completed and semaphore counter is incremented:

#include <iostream>
#include "thread_pool.hpp"

std::mutex output_mutex;

int main()
{
    std::size_t threads = 2, capacity = 6;

    thread_pool workers(threads, capacity);

    for(int n = 0; n < 16; ++n)
    {
        // This will block if there are 6 tasks in the queue already
        workers.post([=]
        {
            auto guard = std::lock_guard(output_mutex);
            std::cout << "Hello from task " << n << "!\n";
        });
    }

    return 0;
}

Waiting for the tasks completion

Usually we need an ability to wait until all thread pool tasks are completed. That can be done with boost::asio::thread_pool::wait member function. Consider this example:

#include <atomic>
#include <iostream>
#include <boost/asio/post.hpp>
#include <boost/asio/thread_pool.hpp>

int main()
{
    std::atomic<int> sum = 0;

    boost::asio::thread_pool workers(16);

    for(int n = 0; n < 16; ++n)
    {
        boost::asio::post(workers, [n, &sum]
        {
            sum += n;
        });
    }

    workers.wait();

    std::cout << "The sum is " << sum << "\n";

    return 0;
}

However that may work not as you expect. Thread pool wait function blocks the caller thread until all tasks are completed — this part is clear. However it also join working threads, so if you submit more work to the thread pool after that, they won't process. And there is no way to restart Boost.Asio thread pool within the same instance.

The simplest solution for that — is to delete the object and create a new one:

auto workers = std::make_unique<boost::asio::thread_pool>(16);
// ...
workers->wait();
// ...
workers.reset();
workers = std::make_unique<boost::asio::thread_pool>(16);
// ...
workers->wait();
// ...

Let's integrate this into our thread_pool class:

public:

    thread_pool(std::size_t threads, std::size_t capacity)
    , threads(threads)
    , queue  (capacity)
    {
        reset();
    }

    // ...

    void wait()
    {
        workers->wait();
    }

    void reset()
    {
        if(workers)
        {
            wait();
            workers.reset();
        }
        workers = std::make_unique<boost::asio::thread_pool>(threads);
    }

private:

    std::size_t const threads;
    std::unique_ptr<boost::asio::thread_pool> workers;
    // ...

This is simple and it works. There are a couple of issues though.

First — there is a gap between deletion of the old thread pool object and creation of the new one. If someone post into thread pool from another thread during this gap, the program will crash.

Second — joining, destroying, creating and starting OS threads isn't very fast sequence of operations. If you need to wait for tasks completion once or twice per second, then you can go this way safely. However if for some reason you have to do this much often, then it's not an efficient way.

Summary

Let's summarize what we've done today:

  • We've taken Boost.Asio thread pool implementation and started to build additional features on top of it;
  • We've implemented a way to obtain return value from submitted tasks;
  • We've limited tasks queue size with a given number;
  • We've implemented a way to wait until all submitted tasks are completed and our thread pool is ready to do more work after that.

In the next part of “C++ thread pool” we'll improve some of those features and implement new ones.

Full source code for this article: source.zip.

Rate this post:
Share this page: