原子操作(atomic operation)是一个不可分割的操作。对于原子操作,我们永远无法观察到它完成到一半的状态,它要么做完了,要么没有做(并发编程书中的描述是“要么就没做完”,其实不太准确)。如果读取对象值的载入操作是原子的,则这个载入操作获取到的值要么是对象的初始值,要么是被修改后的某个值。

标准原子操作

标准原子类型在头文件<atomic>中,对这种类型的所有操作都是原子的。它们(几乎)都有一个成员函数is_lock_free(),决定在给定类型上的操作是直接用原子指定完成的(is_lock_free()返回true),还是通过使用编译器和类库内部的锁来模拟完成的(is_lock_free()返回false)。(再次吐槽书中的说法,“让用户决定在给定类型上的操作是否直接用原子指令完成,或者通过使用编译器和类库内部的锁来完成”,用户并不能决定,我们只能通过标准库查询这个状态)。看cppreference的例子

struct A { int a[100]; };
struct B { int x, y; };
int main()
{
    std::cout << std::boolalpha
        << "std::atomic<A> is lock free? "
        << std::atomic<A>{}.is_lock_free() << '\n'
        << "std::atomic<B> is lock free? "
        << std::atomic<B>{}.is_lock_free() << '\n';
}

输出为

std::atomic<A> is lock free? false
std::atomic<B> is lock free? true

唯一不提供is_lock_free()的类型是std::atomic_flag。它是一个布尔标识,并且在这个类型上的操作要求是无锁的。其余的原子类型都是通过std::atomic<>类模板特化实现的。标准的原子类型没有拷贝构造函数和拷贝赋值运算符,因此是不可复制和赋值的。当然它们支持从对应的内置类型隐式转换并赋值。例如,std::atomic<int>支持++--运算符,对应的成员函数为fetch_add()fetch_sub()

std::atomic_flag

std::atomic_flag是最简单的标准原子类型,代表一个布尔标志。该类型必须用ATOMOC_FLAG_INIT初始化,将布尔值设置为false。之后我们只能对它做三件事销毁、清除标志和设置并查询其先前的值,分别对应析构函数、clear()test_and_set()成员函数。

std::atomic_flag f = ATOMIC_FLAG_INIT;
f.clear(std::memory_order_relase);
bool x = f.test_and_set();

我们可以使用std::atomic_flag实现一个简单的自旋锁。

#include <atomic>
#include <iostream>
#include <thread>

class spinlock_mutex
{
    std::atomic_flag flag = ATOMIC_FLAG_INIT;
public:
    spinlock_mutex() {}

    void lock()
    {
        while (flag.test_and_set(std::memory_order_acquire));
    }

    void unlock()
    {
        flag.clear(std::memory_order_release);
    }
};

spinlock_mutex mutex;
int i = 0;

void add()
{
    mutex.lock();
    for (int time = 0; time < 10000; time++)
    {
        i++;
    }
    mutex.unlock();
}

int main()
{
    std::thread t1(add);
    std::thread t2(add);

    t1.join();
    t2.join();

    std::cout << "i = " << i;
}

结果会输出i的值为20000。

std::atomic类模板

最基本的原子整数类型是std::atomic<bool>。它的功能比std::atomic_flag更全,尽管仍然不可拷贝构造和赋值,但可以通过非原子的布尔进行构造。

std::atomic<bool> b(true);
b = false;

std::atomic_flag不同的是,对std::atomic<T>类模板的操作通过更为通用的store()load()exchange()成员函数进行,分别对应写、读和读-修改-写操作。

std::atomic<bool> b;
bool x = b.load(std::memory_order_acquire);
b.store(true);
x = b.exchange(false, std::memory_order_acq_rel);

除了exchange(),std::atomic还引入了compare_exchange_weak()compare_exchange_weak()进行读-修改-写操作。

compare_exchange_weak()

bool compare_exchange_weak (T& expected, T val, memory_order sync = memory_order_seq_cst) volatile noexcept;
bool compare_exchange_weak (T& expected, T val, memory_order sync = memory_order_seq_cst) noexcept;
    
bool compare_exchange_weak (T& expected, T val, memory_order success, memory_order failure) volatile noexcept;
bool compare_exchange_weak (T& expected, T val, memory_order success, memory_order failure) noexcept;

比较并交换被封装的值(weak)与参数expected所指定的值是否相等,如果:

  1. 相等,则用val替换原子对象的旧值。
  2. 不相等,则用原子对象的旧值替换expected,因此调用该函数之后,如果被该原子对象封装的值与参数expected所指定的值不相等,expected中的内容就是原子对象的旧值。

该函数通常会读取原子对象封装的值,如果比较为true(即原子对象的值等于expected),则替换原子对象的旧值,但整个操作是原子的,在某个线程读取和修改该原子对象时,另外的线程不能对读取和修改该原子对象。

在后面两个重载中,内存序(Memory Order)的选择取决于比较操作结果,如果比较结果为true(即原子对象的值等于 expected),则选择参数success指定的内存序,否则选择参数failure所指定的内存序。

注意,该函数直接比较原子对象所封装的值与参数expected的物理内容,所以某些情况下,对象的比较操作在使用operator==()判断时相等,但compare_exchange_weak()判断时却可能失败,因为对象底层的物理内容中可能存在位对齐或其他逻辑表示相同但是物理表示不同的值(比如true和2或3,它们在逻辑上都表示"真",但在物理上两者的表示并不相同)。

对于compare_exchange_weak(),即使原子对象封装的值等于expected,仍有可能返回false,值也不会更新为val,即允许伪失败(spurious failure)。这最有可能发生在缺少单个的比较并交换指令的机器上。由于伪失败可能会发生,它通常被用在循环中:

bool expected = false;
extern atomic<bool> b;
while(!b.compare_exchange_weak(expected, true) && !expected);

compare_exchange_strong()不会有伪失败发生,即保证仅当实际值不等于expected时才会返回false。看下面Cplusplus-Concurrency-In-Practice的一个例子

#include <iostream>       // std::cout
#include <atomic>         // std::atomic
#include <thread>         // std::thread
#include <vector>         // std::vector

// a simple global linked list:
struct Node { int value; Node* next; };
std::atomic<Node*> list_head(nullptr);
 
void append(int val)
{
    // append an element to the list
    Node* newNode = new Node{val, list_head};
 
    // next is the same as: list_head = newNode, but in a thread-safe way:
    while (!list_head.compare_exchange_weak(newNode->next,newNode)) {}
    // (with newNode->next updated accordingly if some other thread just appended another node)
}
 
int main ()
{
    // spawn 10 threads to fill the linked list:
    std::vector<std::thread> threads;
    for (int i = 0; i < 10; ++i) threads.push_back(std::thread(append, i));
    for (auto& th : threads) th.join();
 
    // print contents:
    for (Node* it = list_head; it!=nullptr; it=it->next)
        std::cout << ' ' << it->value;
 
    std::cout << '\n';
 
    // cleanup:
    Node* it; while (it=list_head) {list_head=it->next; delete it;}
 
    return 0;
}

该例使用compare_exchange_weak()构建了无锁的链表。一个可能输出为:

9 8 7 6 5 4 3 2 1 0

在书的第七章会详细介绍无锁数据结构。