前面我们介绍了如何去保护线程间共享的数据。除了保护数据,我们还希望可以让一个线程等待另一个线程完成任务。很容易想到的方法是在线程间共享一个表示任务是否完成的标识。这不是个理想的方法。c++标准库提供了条件变量(condition variables)和期值(future)更好的解决了这个问题。下面我们直接看如何使用条件变量。
用条件变量等待条件
标准库提供了两个条件变量的实现,分别是std::condition_variable
和std::condition_variable_any
,均在头文件<condition_variable>
中。两者均需要配合互斥元使用。前者需要和标准库的std::mutex
一起使用,后者可以同符合称为互斥元的最低标准的对象使用,因此在带来额外性能开销的同时,提供了更多的灵活性。
std::mutex m;
std::queue<data_chunk> data_queue;
std::condition_variable data_cond;
void data_preparation_thread()
{
while(more_data_to_prepare())
{
data_chunk const data = prepare_data();
std::lock_guard<std::mutex> lk(m);
data_queue.push(data);
data_cond.notify_one();
}
}
void data_processing_thread()
{
while(true)
{
std::unique_lock<std::mutex> lk(m);
data_cond.wait(
lk, []{ return !data_queue.empty(); });
data_chunk data = data_queue.front();
data_queue.pop;
lk.unlock();
process(data);
if(is_last_chunk(data))
break;
}
}
我们有一个在线程间传递数据的队列data_queue
。在准备数据的线程data_preparation_thread()
中,先将互斥元锁定再将数据压入队列。随后调用条件变量的成员函数notify_one()
,通知等待的线程数据已准备好。在处理数据的线程data_processing_thread()
中,使用std::unique_lock
将互斥元锁定。随后条件变量的wait()
函数被调用,传入的参数是互斥锁lk
和一个lambda函数。实际上这个lambda函数会返回一个bool值,表示等待的条件。如果条件满不足,即数据队列为空,lambda函数返回false,wait()
会解锁互斥元,并将该线程置于阻塞状态(阻塞至本行,wait()
不会返回)。当数据准备线程发出通知,即调用notify_one()
后,数据处理线程会被唤醒,重新锁定互斥元并再次检测等待条件。如果不满足会再次进入休眠状态;如果满足,wait()
会返回,继续进行后面的处理。这里使用了std::unique
而不是std::lock_guard
,因为后者没有在等待期间解锁互斥元的灵活性,一旦上锁并且处理线程进入休眠,准备线程将无法锁定互斥元,等待的条件永远无法得到满足。wait()
的第二个参数不仅可以接收lambda函数,任何函数和可调用对象都可以,如函数指针,std::function
,std::bind
等。当然也不可以忽略第二个参数,使用方式见该知乎问题。
使用条件变量建立一个线程安全的队列
先看下标准库的std::queue
接口。(或许应该再开一个关于《stl源码剖析》的专栏。。。)
template<class T, class Container = std::deque<T>>
class queue
{
explicit queue(const Container&);
explicit queue(Container&& = Container());
template<class Alloc> explicit queue(const Alloc&);
template<class Alloc> queue(const Container&, const Alloc&);
template<class Alloc> queue(Container&&, const Alloc&);
template<class Alloc> queue(queue&&, const Alloc&);
void swap(queue& q);
bool empty() const;
size_type size() const;
T& front();
const T& front() const;
T& back();
const T& back() const;
void push(const T& x);
void push(T&& x);
void pop();
template<class... Args> void emplace(Args&&... args);
};
我们重新建立一个线程安全的队列的接口如下:
#include <memory>
template<typename T>
class threadSafe_queue
{
public:
threadSafe_queue();
threadSafe_queue(const threadSafe_queue&);
threadSafe_queue& operator=(const threadSafe_queue&) = delete;
void push(T new_value);
bool try_pop(T& value);
std::shared_ptr<T> try_pop();
void wait_and_pop(T& value);
std::shared_ptr<T> wait_and_pop();
bool empty() const;
};
我们如何用条件变量实现相关接口呢?
#include <memory>
#include <mutex>
#include <condition_variable>
template<typename T>
class threadSafe_queue
{
private:
std::mutex m;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
void push(T new_value)
{
std::lock_guard<std::mutex> lk(m);
data_queue.push(new_value);
data_cond.notify_one();
}
void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(m);
data_cond.wait(lk, [this] { return !data_queue.empty(); });
value = data_queue.front();
data_queue.pop();
}
};
threadSafa_queue<data_chunk> data_queue;
void data_preparation_thread()
{
while(more_data_to_process())
{
data_chunk const data = prepare_data();
data_queue.push(data);
}
}
void data_processing_thread()
{
while(true)
{
data_chunk data;
data_queue.wait_and_pop(data);
process(data);
if(is_last_chunk(data))
break;
}
}
将条件变量的使用移到了队列的内部实现,无需在使用时进行额外处理。一个完整的线程安全的队列实现如下:
#include <memory>
#include <queue>
#include <mutex>
#include <condition_variable>
template<typename T>
class threadSafe_queue
{
private:
mutable std::mutex m;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadSafe_queue() {}
threadSafe_queue(const threadSafe_queue& other)
{
std::lock_guard<std::mutex> lk(ohter.m);
data_queue = other.data_queue;
}
threadSafe_queue& operator=(const threadSafe_queue&) = delete;
void push(T new_value)
{
std:lock_guard<std::mutex> lk(m);
data_queue.push(new_value);
data_cond.notify_one();
}
bool try_pop(T& value)
{
std::lock_guared<std::mutex> lk(m);
if(data_queue.empty())
return false;
value = data_queue.front();
data_queue.pop();
return true;
}
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(m);
if(data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front());
data_queue.pop();
return res;
}
void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(m);
data_cond.wait(lk, [this] { return !data_queue.empty(); });
value = data_queue.front();
data_queue.pop();
}
std::shared_ptr<T> wait_and_pop()
{
std::unique<std::mutex> lk(m);
data_cond.wait(lk, [this] { return !data_queue.empty(); });
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
bool empty() const
{
std::lock_guard<std::mutex> lk(m);
return data_queue_empty();
}
};
0 条评论