前面我们介绍了如何去保护线程间共享的数据。除了保护数据,我们还希望可以让一个线程等待另一个线程完成任务。很容易想到的方法是在线程间共享一个表示任务是否完成的标识。这不是个理想的方法。c++标准库提供了条件变量(condition variables)和期值(future)更好的解决了这个问题。下面我们直接看如何使用条件变量。

用条件变量等待条件

标准库提供了两个条件变量的实现,分别是std::condition_variablestd::condition_variable_any,均在头文件<condition_variable>中。两者均需要配合互斥元使用。前者需要和标准库的std::mutex一起使用,后者可以同符合称为互斥元的最低标准的对象使用,因此在带来额外性能开销的同时,提供了更多的灵活性。

std::mutex m;
std::queue<data_chunk> data_queue;
std::condition_variable data_cond;

void data_preparation_thread()
{
    while(more_data_to_prepare())
    {
        data_chunk const data = prepare_data();
        std::lock_guard<std::mutex> lk(m);
        data_queue.push(data);
        data_cond.notify_one();        
    }
}
void data_processing_thread()
{
    while(true)
    {
        std::unique_lock<std::mutex> lk(m);
        data_cond.wait(
            lk, []{ return !data_queue.empty(); });
        data_chunk data = data_queue.front();
        data_queue.pop;
        lk.unlock();
        process(data);
        if(is_last_chunk(data))
            break;
    }
}

我们有一个在线程间传递数据的队列data_queue。在准备数据的线程data_preparation_thread()中,先将互斥元锁定再将数据压入队列。随后调用条件变量的成员函数notify_one(),通知等待的线程数据已准备好。在处理数据的线程data_processing_thread()中,使用std::unique_lock将互斥元锁定。随后条件变量的wait()函数被调用,传入的参数是互斥锁lk和一个lambda函数。实际上这个lambda函数会返回一个bool值,表示等待的条件。如果条件满不足,即数据队列为空,lambda函数返回false,wait()会解锁互斥元,并将该线程置于阻塞状态(阻塞至本行,wait()不会返回)。当数据准备线程发出通知,即调用notify_one()后,数据处理线程会被唤醒,重新锁定互斥元并再次检测等待条件。如果不满足会再次进入休眠状态;如果满足,wait()会返回,继续进行后面的处理。这里使用了std::unique而不是std::lock_guard,因为后者没有在等待期间解锁互斥元的灵活性,一旦上锁并且处理线程进入休眠,准备线程将无法锁定互斥元,等待的条件永远无法得到满足。
wait()的第二个参数不仅可以接收lambda函数,任何函数和可调用对象都可以,如函数指针,std::functionstd::bind等。当然也不可以忽略第二个参数,使用方式见该知乎问题

使用条件变量建立一个线程安全的队列

先看下标准库的std::queue接口。(或许应该再开一个关于《stl源码剖析》的专栏。。。)

template<class T, class Container = std::deque<T>>
class queue
{
    explicit queue(const Container&);
    explicit queue(Container&& = Container());

    template<class Alloc> explicit queue(const Alloc&);
    template<class Alloc> queue(const Container&, const Alloc&);
    template<class Alloc> queue(Container&&, const Alloc&);
    template<class Alloc> queue(queue&&, const Alloc&);

    void swap(queue& q);

    bool empty() const;
    size_type size() const;

    T& front();
    const T& front() const;
    T& back();
    const T& back() const;

    void push(const T& x);
    void push(T&& x);
    void pop();
    template<class... Args> void emplace(Args&&... args);
};

我们重新建立一个线程安全的队列的接口如下:

#include <memory>

template<typename T>
class threadSafe_queue
{
public:
    threadSafe_queue();
    threadSafe_queue(const threadSafe_queue&);
    threadSafe_queue& operator=(const threadSafe_queue&) = delete;

    void push(T new_value);

    bool try_pop(T& value);
    std::shared_ptr<T> try_pop();

    void wait_and_pop(T& value);
    std::shared_ptr<T> wait_and_pop();

    bool empty() const;
};

我们如何用条件变量实现相关接口呢?

#include <memory>
#include <mutex>
#include <condition_variable>

template<typename T>
class threadSafe_queue
{
private:
    std::mutex m;
    std::queue<T> data_queue;
    std::condition_variable data_cond;
public:
    void push(T new_value)
    {
        std::lock_guard<std::mutex> lk(m);
        data_queue.push(new_value);
        data_cond.notify_one();
    }

    void wait_and_pop(T& value)
    {
        std::unique_lock<std::mutex> lk(m);
        data_cond.wait(lk, [this] { return !data_queue.empty(); });
        value = data_queue.front();
        data_queue.pop();
    }
};

threadSafa_queue<data_chunk> data_queue;
void data_preparation_thread()
{
    while(more_data_to_process())
    {
        data_chunk const data = prepare_data();
        data_queue.push(data);
    }
}
void data_processing_thread()
{
    while(true)
    {
        data_chunk data;
        data_queue.wait_and_pop(data);
        process(data);
        if(is_last_chunk(data))
            break;
    }
}

将条件变量的使用移到了队列的内部实现,无需在使用时进行额外处理。一个完整的线程安全的队列实现如下:

#include <memory>
#include <queue>
#include <mutex>
#include <condition_variable>

template<typename T>
class threadSafe_queue
{
private:
    mutable std::mutex m;
    std::queue<T> data_queue;
    std::condition_variable data_cond;
public:
    threadSafe_queue() {}

    threadSafe_queue(const threadSafe_queue& other)
    {
        std::lock_guard<std::mutex> lk(ohter.m);
        data_queue = other.data_queue;
    }

    threadSafe_queue& operator=(const threadSafe_queue&) = delete;

    void push(T new_value)
    {
        std:lock_guard<std::mutex> lk(m);
        data_queue.push(new_value);
        data_cond.notify_one();
    }

    bool try_pop(T& value)
    {
        std::lock_guared<std::mutex> lk(m);
        if(data_queue.empty())
            return false;
        value = data_queue.front();
        data_queue.pop();
        return true;
    }
    std::shared_ptr<T> try_pop()
    {
        std::lock_guard<std::mutex> lk(m);
        if(data_queue.empty())
            return std::shared_ptr<T>();
        std::shared_ptr<T> res(std::make_shared<T>(data_queue.front());
        data_queue.pop();
        return res;
    }

    void wait_and_pop(T& value)
    {
        std::unique_lock<std::mutex> lk(m);
        data_cond.wait(lk, [this] { return !data_queue.empty(); });
        value = data_queue.front();
        data_queue.pop();
    }

    std::shared_ptr<T> wait_and_pop()
    {
        std::unique<std::mutex> lk(m);
        data_cond.wait(lk, [this] { return !data_queue.empty(); });
        std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
        data_queue.pop();
        return res;
    }

    bool empty() const
    {
        std::lock_guard<std::mutex> lk(m);
        return data_queue_empty();
    }
};