C++11 并发编程:深入理解内存模型和原子类型

C++11并发编程中,利用内存模型和原子类型实现无锁化并发性能优化,讲解了原子操作、内存序和同步机制。

原文标题:【硬核】C++11并发:内存模型和原子类型

原文作者:阿里云开发者

冷月清谈:

本文介绍了 C++11 并发编程中的内存模型和原子类型,并结合代码示例讲解了如何进行无锁化并发性能优化。

**内存模型**

C++ 中的对象是一块连续的内存区域,每个对象至少占用一个内存地址。理解对象和内存地址的关系对于并发编程至关重要,因为多线程访问共享内存地址时可能存在竞态条件。为了避免竞态条件,可以使用互斥锁或原子操作来保证线程间访问的有序性。

C++ 对象有一个修改顺序,由所有线程的写操作组成。所有线程必须同意这个顺序。如果修改的对象不是原子类型的,需要使用同步工具(如互斥锁)来保证一致性,而原子操作则由编译器保证同步。

**原子类型**

C++ 标准库提供了各种基本数据类型的原子类型,例如 `std::atomic`。原子操作不可分割,避免了竞态条件。

原子类型的一些关键点:
1. 不能拷贝和赋值;
2. 可以通过 `is_lock_free()` 判断是否是无锁实现;
3. 提供了 `compare_exchange_weak()` 和 `compare_exchange_strong()` 方法用于 CAS 操作;
4. 不要使用自定义的原子类型;
5. C++ 指针也具有原子类型,例如 `std::atomic`。

**同步操作和强制排序**

为了避免竞态条件,可以使用原子变量来指示数据是否准备好,其他线程只有在标记被设置后才能读取数据。原子变量提供了必要的内存序列模型 `happens-before` 和 `synchronizes-with`。

文章介绍了 `synchronized-with` 和 `happens-before` 两种同步关系,以及原子操作的内存序标记,包括顺序一致序列、松散序列、获取-释放序列、数据依赖和栅栏。

顺序一致序列是最直接的排序规则,但也是最消耗内存的。松散序列不保证跨线程的原子操作顺序。获取-释放序列提供部分同步,比顺序一致序列更高效。数据依赖是一种较弱的同步方式,只约束相关的数据。栅栏可以限制编译器或硬件对不相关变量的重排,并引入新的 `happens-before` 和 `synchronizes-with` 关系。

怜星夜思:

1、文章提到了无锁化并发,相比于使用锁的并发,它有什么优势和劣势?在实际项目中,如何选择合适的并发方式?
2、文章中提到了几种不同的内存序,例如顺序一致性、松散序列等,它们之间有什么区别?在实际应用中,如何选择合适的内存序?
3、文章提到了不要使用自定义的原子类型,这是为什么?有没有一些例外情况?

原文内容

阿里妹导读


本文从C++11并发编程中的关键概念——内存模型与原子类型入手,结合详尽的代码示例,抽丝剥茧地介绍了如何实现无锁化并发的性能优化。

引言

在游戏后台领域C++一直是主流开发语言。就算是在服务器性能已经非常强大的今天,我依然觉得游戏行业对于性能的突破的需求还远没有结束。大家手机游戏动辄要120hz,这意味着游戏的战斗服的帧数必须超过120hz。以前我一台服务器装下几十人在线战斗刷副本已经很不错了,但是像万龙觉醒类型的SLG动辄就是几百好人同屏战斗,上万人在同一台服务器战斗,基于此每一台机器的性能都必须充分压榨。无锁化并发的性能优化需要后台开发者熟悉C++的内存模型和原子类型的使用。

一、内存模型基础

内存模型有两个方面:一个是基本结构,这与内存中存储数据的布局有关;另外一个是并发性能方面。数据结构对于并发性能很重要,特别在于 low-level 的原子操作。下面将从对象和内存地址开始介绍。


1.1 对象和内存地址

与Java,Ruby等面向对象语言普遍理解的 “everythings is object” 不同,C++的对象是 “a  region of storage”。C++的对象是一个紧密聚集在一块的内存区域。有四个重要的特点:

(1)每一个变量都是一个对象,包括成员变量;
(2)每一个对象至少占用一个内存地址;
(3)基本变量无论多大都只有一个内存地址;

(4)相邻bit域下的内存地址是一致的。

以上这些特点怎么理解呢,和我们的系统的内存分配机制有关系哈,举一个栗子就能明白。

class zoo
{
public:
 int m_number;
 Pig m_onePig;
 PigHome* p_pigHome;
}

上面定义了一个C++类,当我们在内存中创建一个zoo对象时其实是划分了一个连续的内存区域,并且赋予了一个内存地址指向这个内存区域,这就是特点2,至少占用一个内存地址。zoo里面定义了成员变量 m_number,m_onePig和p_pigHome。因为m_number和p_pigHome是基本变量(指针算特殊基本变量)本身只有一个内存地址的,命中特点3。在zoo开辟的这块内存地址是连续的,m_number,m_onePig和p_pigHome的内存地址,都属于zoo的内存地址内,这是特点4。


1.2 对象,内存地址与并发

C++ 多线程应用最重要的点:都取决于内存地址。如果有线程更新多个线程共享的内存地址的内容就存在竞态条件(the race condition)。为了避免竞态条件,就需要使多个线程按顺序访问资源。一种方式是使用互斥锁(mutex);另外一种就是使用具有原子性的(atomic)同步属性去操作同一个内存地址或者其他内存地址,使得线程间的访问有序化。

对于同一个内存的访问如果没有强制的顺序的话,数据竞争导致的情况是未定义的。


1.3 修改顺序

从对象的初始化开始,C++对象都有定义一个修改顺序,这个顺序由所有线程的写组成。大多数情况下,每次程序运行的顺序都是不同的,但是全部线程都要同意按这顺序进行修改。如果修改的对象不是原子性的,就需要使用必要的同步工具(互斥锁等)保证线程对每一个变量的修改顺序达成一致。如果使用原子操作(atomic operations),则编译器负责保证同步的到位。

这使得某些类型的推测执行是不被允许的。线程执行修改的过程中,看到一个特定的条目后,这个线程后续的读必须返回最新值,并且后续的写必须发生在修改之后。同样,在同一个线程中,写一个对象的值之后再读这个对象的值,那么必须是最新写的值。

C++执行顺序问题执行顺序问题比较复杂,这里稍微展开方便对后面内容的理解。首先,在代码没有特别标志的情况下,主流的C++编译器为了达到更好的执行效率,往往会对我们所写的代码进行编译重排。这就导致了实际执行的代码和我们所写的代码的顺序有所不同。此外,程序执行过程实际是一条条CPU指令,为了执行过程中CPU流水线效率的最大化,在不影响结果的情况下允许执行指令的重排。以上重排都基于保证单线程执行结果的正确性的。另外,现在主流的硬件都是多核架构,不同线程可能运行在不同的CPU核心上。这存在不同线程对同一个写数据可能发生在不同核心的缓存中,而缓存有效性和数据写回实际内存的时机是需要同步来保证的。C++将这种保证也交给到了开发者,开发者可以灵活使用,优化执行效率。

二、C++ 标准库提供的原子类型

原子操作不可分割,只有完成和未完成,无中间态。如果读写操作都是原子的,那么读到的数据要么是初始值,要么就是修改后的值。对于一个非原子数据的同时读写,就会存在竞态,导致读取到的值是未定义的。我们很快就会想到使用互斥锁来保证这样的读写。此外,C++对常用的数据类型都提供了原子类型,通过编译器在编译优化中保障读写过程的同步(往往比互斥锁的效率要高) 。


2.1 标准原子类型





上面这个表格列出的是C++基本数据类型的原子类型。这些原子类型的方法和使用规则可以参考这个链接:C++11 原子类型与原子操作[1],篇幅有限不一一说明。下面列出一些个人认为比较关键的点。


2.2 原子类型比较关键的点

(1) 标准原子类型是不能拷贝和赋值,他们没有拷贝构造函数和拷贝赋值操作;

(2)C++ 原子类很多是无锁实现的,但是这也与编译器和其运行的平台有关,所以可以通过 is_lock_free() 来判断是否是无锁的;

(3)根据当前值判定是否存储新值(CAS,Compare and exchange):原子类型的为CAS操作提供了两个方法分别是:

  • compare_exchange_weak()
  • compare_exchange_strong()

在一些不支持 compare-and-exchange 单指令的机器上,如果处理器不能保证操作以原子方式完成,compare_exchange_week()即使期望值与当前值是相等的也会存储新值失败。(PS:操作系统的线程比处理器多时,执行操作的线程在必要的指令序列中间被切换掉,另一个线程被操作系统安排在它的位置上。这种情况被称为 spurious failure)。compare_exchange_strong()则保证不会有spurious failure。

为了避免可能存在的 spurious failure,compare_exchange_weak() 可以采取自旋的方式使用:

bool expected = false;
extern atomic<bool> b;
while(!b.compare_exchange_weak(expected, true) && !expected);// 当期望值与实际值不等,会修改期望值为实际值

简单值建议使用compare_exchange_weak(),计算复杂和存储耗时的值可以使用compare_exchange_strong()。如果确定处理器不会出现spurious failure(例如 X86)直接使用compare_exchange_weak()。

(4)虽然有std::atomic<>,但是不要使用自定义的原子类型。原因有很多,例如原子类型的CAS操作是基于memcmp()和memcpy()的,而atomic<float>等浮点类型有有精度误差的,不能保证CAS的正确性;编译器通常都会使用内部锁来处理这类自定义的原子类型,如果自定义类型存在默认的拷贝赋值就会带来很大的问题。总而言之,自定义类型很难保证类型本身完全符合原子类型的标准,不要去使用自定义的原子类型。

(5)C++指针是有原子类型的:std::atomic<T*>。

三、同步操作和强制排序

假设有两个线程,其中一个准备去填充一块数据,为了确保这块数据没有竞态问题,它为这个数据设置了一个标记来指示这块数据是否是准备好的,另外一个线程直到这个标记被设置了才能读这块数据。下面的代码是一个简单的例子供参考。

std::vector<int> data;
std::atomic<bool> data_ready(false);

void reader_thread()
{
   while(!data_ready.load())
   {
       std::this_thread::yield();
   }
   std::cout << “data is” << data[0]  << std::endl;

}

void write_thread()
{
   data.push_back(1);
   data_ready = true; //  std::atomic<bool> 重载了 =
}

上面这段代码这段代码非常符合直觉,程序会输出"1"。这段代码中原子变量data_ready 提供了必要的内存序列模型 happens-before 和 synchronizes_with。

  • 写 data 发生于写 data_ready 之前;
  • 读 data_ready 发生于读 data 之前;

happens-before是符合传递律的,所以写 data 发生于读 data 之前,这也就意味着,data_ready 设置为 true时,data 的写已经同步给了data 的读。上面的代码是存在一个强制设定的顺序的。


3.1 synchronized-with 关系

下面就是介绍两种同步关系了,首先讲synchronized-with 关系。在我把它翻译成同步对应关系。

只有在原子类型的操作之间才能获得synchronized -with关系。如果数据结构包含原子类型,并且对该数据结构的操作在内部执行适当的原子操作,则数据结构上的操作(例如锁定互斥锁)可能提供这种关系,但基本上它只来自于对原子类型的操作。

如果线程A存储了一个值,而线程B读取了这个值,那么线程A中的存储和线程B中的读之间就存在synchronized -with关系(这个值是原子类型的哦,不是的话这个过程会导致未知的结果)。

为了能更通俗的理解,就是一个线程1对原子变量x进行写的操作,就需要保障其他读x值的线程能正确读到x被修改后的值。


3.2 happens-before 关系

happens-before 关系是程序中操作顺序的基本结构块,它指定了哪些操作可以看到其他操作的效果。我把他翻译成前后关系。如果一个线程上的操作A发生在另一个线程上的操作B之前,那么A和B就有前后关系了。

happens-before是符合传递律的。A happens-before B , B happens-before C, then A happens-before C。这个传递规律也可以被总结为 ordered-before 关系(排序前后关系)。


3.3 原子操作的内存序标记

下面间介绍用于原子操作的内存序标记和他们的同步关系(synchronized-with)。C++提供了6种内存排序如下表所示:参考传送门C++之Memory order[2]





3.3.1 顺序一致序列(sequentially consistent ordering)
C++ 原子类型的默认就是顺序一致性排序的,使用的是:

std::memory_order_seq_cst 来标识。从同步的角度来说,对于同一个原子变量的读写,与声明的顺序是一致的。这种情况符合人的直觉,使用一个简单例子来说明这个一致性规则。

#include <atomic>
#include <thread>
#include <iostream>
#include <assert.h>
std::atomic<bool> x,y;

void write_x()
{
   x.store(true,std::memory_order_seq_cst);// x 设置为true
}

void write_y()
{
   y.store(true,std::memory_order_seq_cst);// y 设置为true
}

void read_x_then_y()
{
   while(x.load(std::memory_order_seq_cst) == false);
   if(y.load(std::memory_order_seq_cst) == true)
   {
       std::cout << “ok1” << std::endl;// x变成true时候y也是true则输出ok1
   }
}

void read_y_then_x()
{
   while(y.load(std::memory_order_seq_cst) == false);
   if(x.load(std::memory_order_seq_cst) == true)
   {
       std::cout << “ok2” << std::endl;// y变成true时候x也是true则输出ok2
   }
}

int main()
{
   x = false;
   y = false;
   std::thread a(write_x);
   std::thread b(write_y);
   std::thread c(read_x_then_y);
   std::thread d(read_y_then_x);
   a.join();
   b.join();
   c.join();
   d.join();
}

上面的这段代码的结果是总都会打出ok,可能打印两次也可能是一次。情况1:x变成true时,如果y还是false则 read_x_then_y() 提起结束, "ok1" 不会输出,此时read_y_then_x()还在等待,直到y变成true,此时x已经变成true,"ok2" 打印;情况2:y变成true,x还是false,和情况1类似,打印 “ok1”;情况三,x和y都是true,打印 "ok1" 和 “ok2”。

顺序一致性规则是最直接和符合直觉的排序规则,但是它是最费内存的,因为需要全部线程进行全局的同步。在多处理器系统下还需要额外花销,多个处理器间需要的同步通讯的时间花销。

3.3.2 松散序列(relaxed ordering)

松散排序的原子操作,不构成 synchronized-with 关系。在同一个线程中执行对同一个原子变量仍然保持着happen-before的关系(保序),但是跨线程的原子操作不保序。相对于顺序一致排序,使用松散排序可能使不同的线程看到的变量修改顺序不一致。下面是松散排序的一个简单例子。

#include <atomic>
#include <thread>
#include <assert.h>
std::atomic<bool> x,y;
std::atomic<int> z;
void write_x_then_y()
{
   x.store(true,std::memory_order_relaxed);// 保证 x 的原子写
   y.store(true,std::memory_order_relaxed);// 保证 y 的原子写
}

void read_y_then_x()
{
   while(!y.load(std::memory_order_relaxed));
   if(x.load(std::memory_order_relaxed))
     ++z;// 如果 y为 true x 已经为ture 则 z++
}

int main() {
   x=false;
   y=false;
   z=0;
   std::thread a(write_x_then_y);
   std::thread b(read_y_then_x);
   a.join();
   b.join();
   assert(z.load()!=0);
}

上述过程的断言可能会发生!std::memory_order_relaxed 只保证了同一线程内同一原子变量的 happen-before 关系(注意这是在同一原子变量)。在线程write_x_then_y() 中写a和写b是可以自由重排的,因为他们没有强制的happens-before 关系。此外,即使在线程write_x_then_y() 中 ,x 先设置为 true,y 再设置为 true,由于不保证同步关系,即x变成true和y变成true这个事情并不一定保证按照这个修改顺序通知给线程 read_y_then_x()。下面是一个稍微复杂的例子。

#include <thread>
#include <atomic>
#include <iostream>
using namespace std;

atomic_int x(0), y(0), z(0);
atomic_bool go(false);
const unsigned int loop_count(10);

struct read_value
{
   int x, y, z;
};

read_value v1[loop_count];
read_value v2[loop_count];
read_value v3[loop_count];
read_value v4[loop_count];
read_value v5[loop_count];

void increment(atomic_int* var_to_inc, read_value* read_value_ptr)
{
   while (!go)
       std::this_thread::yield();
   for(unsigned i = 0; i < loop_count; i++)
   {
       read_value_ptr[i].x = x.load(std::memory_order_relaxed);
       read_value_ptr[i].y = y.load(std::memory_order_relaxed);
       read_value_ptr[i].z = z.load(std::memory_order_relaxed);
       var_to_inc->store(i + 1, std::memory_order_relaxed);
       std::this_thread::yield();
   }
}

void read_vals(read_value* read_value_ptr)
{
   while (!go)
       std::this_thread::yield();
   for(unsigned i = 0; i < loop_count; i++)
   {
       read_value_ptr[i].x = x.load(std::memory_order_relaxed);
       read_value_ptr[i].y = y.load(std::memory_order_relaxed);
       read_value_ptr[i].z = z.load(std::memory_order_relaxed);
       std::this_thread::yield();
   }

}

void print(read_value* read_value_ptr)
{
   for(unsigned i = 0; i < loop_count; i++)
   {
       if(i)
       {
           cout << “,”;
       }
       cout << “(” << read_value_ptr[i].x << “,” << read_value_ptr[i].y << “,” << read_value_ptr[i].z << “)”;
   }
   cout << endl;
}

int main()
{
   std::thread t1(increment, &x, v1);
   std::thread t2(increment, &y, v2);
   std::thread t3(increment, &z, v3);
   std::thread t4(read_vals, v4);
   std::thread t5(read_vals, v5);
   go = true;
   t5.join();
   t4.join();
   t3.join();
   t2.join();
   t1.join();

   print(v1);
   print(v2);
   print(v3);
   print(v4);
   print(v5);
   return 0;
}

这段代码看似复杂其实就是对原子类型的x,y,z的10次由1到10的赋值运算。线程 t1 负责 x 的由1到10的10次赋值,v1 负责记录10次赋值中 t1 读取到的数据三元组(x, y, z)的数值变化。线程t2 负责 y 的由1到10的10次赋值,三元组(x, y, z)数值变化记录在v2。线程t3 负责 z 的由1到10的10次赋值,三元组(x, y, z)数值变化记录在v3。线程t4和线程t5都是观察记录三元组(x, y, z)的变化,分别记录在v4和v5。

这段代码的一个运行结果:

(0,0,0),(1,3,1),(2,3,2),(3,7,3),(4,8,4),(5,10,5),(6,10,6),(7,10,7),(8,10,8),(9,10,9)
(0,0,0),(0,1,0),(1,2,1),(1,3,1),(2,4,2),(2,5,2),(2,6,2),(4,7,4),(4,8,4),(4,9,4)
(1,2,0),(2,3,1),(2,4,2),(4,7,3),(4,8,4),(6,10,5),(7,10,6),(8,10,7),(8,10,8),(10,10,9)
(0,0,0),(0,0,0),(0,1,0),(0,1,0),(0,2,1),(1,2,1),(1,3,1),(1,3,1),(1,3,1),(1,3,1)
(10,10,10),(10,10,10),(10,10,10),(10,10,10),(10,10,10),(10,10,10),(10,10,10),(10,10,10),(10,10,10),(10,10,10)

上面的结果显示可以看出:

1.线程2对y的累加过程中,x和z有3轮是没有变化的;
2.线程4对整个数据的监控会发现,数据有好几轮是没有同步的;

3.整体数据非常松散;

如果取消松散排序采用默认一致性排序的一种结果如下。相比之下,数据更加紧凑。如果有兴趣可以改一下代码,测试两种方式的执行效率。

(0,0,0),(1,3,1),(2,3,2),(3,4,3),(4,5,4),(5,6,4),(6,10,6),(7,10,7),(8,10,8),(9,10,8)
(0,0,0),(0,1,0),(1,2,1),(2,3,2),(3,4,3),(3,5,3),(5,6,4),(5,7,4),(5,8,6),(6,9,6)
(1,2,0),(2,3,1),(3,3,2),(3,4,3),(5,6,4),(6,8,5),(6,10,6),(8,10,7),(9,10,8),(10,10,9)
(2,3,3),(3,4,3),(3,4,3),(3,4,3),(3,5,3),(3,5,3),(3,5,3),(3,5,3),(3,5,4),(5,6,4)
(10,10,10),(10,10,10),(10,10,10),(10,10,10),(10,10,10),(10,10,10),(10,10,10),(10,10,10),(10,10,10),(10,10,10)
3.3.3 获取-释放序列(acquire-release ordering)

获取-释放序列存在一定的同步关系,但是不是全局的。此内存模型下:

原子读(std::memory_order_acquire)是 acquire;
原子写(std::memory_order_release)是 release;
原子RMW(read-motify-write)操作(例如 fetch_add() 或者 exchange)
可以是acquire,release ,也可以是二者的合并(std::memory_order_acq_rel)。


线程间的读写是一种成对的关系,他们之间是有同步关系的。不同线程还是会看得不同的执行顺序,但是这个执行顺序是被限制的。怎么解释上面说所的执行顺序的限制,看下面的例子。

#include <atomic>
#include <thread>
#include <iostream>
#include <assert.h>
std::atomic<bool> x,y;
std::atomic<int> z;

void write_x()
{
   x.store(true,std::memory_order_seq_release);// x 设置为true
}

void write_y()
{
   y.store(true,std::memory_order_seq_release);// y 设置为true
}

void read_x_then_y()
{
   while(x.load(std::memory_order_seq_acquire) == false);
   if(y.load(std::memory_order_seq_acquire) == true)
   {
       std::cout << “ok1” << std::endl;// x变成true时候y也是true则输出ok1
   }
}

void read_y_then_x()
{
   while(y.load(std::memory_order_seq_acquire) == false);
   if(x.load(std::memory_order_seq_acquire) == true)
   {
       std::cout << “ok2” << std::endl;// y变成true时候x也是true则输出ok2
   }
}

int main()
{
   x = false;
   y = false;
   std::thread a(write_x);
   std::thread b(write_y);
   std::thread c(read_x_then_y);
   std::thread d(read_y_then_x);
   a.join();
   b.join();
   c.join();
   d.join();
}

上面这个例子呢,"ok" 可能不输出。对于线程c来说,读 x 为 true 时,y 可能是 false。与此同时线程d中的 y 为 true 时,x 可能为 false。再总体描述一种情况:线程 a 设置 x 为true,同步给线程 c 但未同步给线程 d,同时线程 b 设置 y 为 true,同步给了线程 d,未同步给线程c。此时线程 c 认为 x 为 ture,y 为 false,线程 d 认为 y 为 true,x 为 false。由于获取-释放序列只保证了部分的先后关系(线程a保证写 x happen-before 线程 c 读 x,不保证 y 的;线程 b和d情况上),因此这是一种部分的同步。

由于获取-释放序列的部分同步特点,经用在一些有先后顺序的场景下使用,一般比顺序一致性序列要高效。下面是一个非常经典的例子:

#include <atomic>
#include <thread>
#include <assert.h>
std::atomic<bool> x,y;
std::atomic<int> z;
void write_x_then_y()
{
   x.store(true,std::memory_order_relaxed);// x是松散式原子写
   y.store(true,std::memory_order_release);// y是释放式原子写
}
void read_y_then_x()
{
   while(!y.load(std::memory_order_acquire));// y获取式原子读
   if(x.load(std::memory_order_relaxed))// x 松散式原子读
       ++z;// 如果y变成ture时 x已经变成 true 则z != 0
}

int main() {
   x=false;
   y=false;
   z=0;
   std::thread a(write_x_then_y);
   std::thread b(read_y_then_x);
   a.join();
   b.join();
   assert(z.load()!=0);
}

以上代码,断言永远不会发生。写 y 保证了写 x 发生于写 y 之前,写 y 和读 y 是同步的,因此在读 y 之前 x 已经被修改(符合传递律)。如上面说展示的,只有acquire读和release写才会形成这种同步。

3.3.4 数据依赖(data dependency )

相比于同步关系,std::memory_order_consume 提供了是相对弱一些的内存同步。这种弱同步被称为是数据依赖(data dependency)。C++中的数据依赖分两种:

(1)carrary-a-dapendency-to:如果操作A的结果在操作B中被用作操作数,则: A携带一个依赖项到B。

(2)dependency-ordered-before:如果读操作B的结果在同一个线程中的进一步操作C中使用,那么A的写操作:

(std::memory_order_release, std::memory_order_acq_rel,或std::memory_order_seq_cst)

的依赖序列在读操作B (std::memory_order_consume)之前。

对于 std::memory_order_consume 标记的一种重要的使用场景就是原子操作一个指针,这个指针指向一个内存区域。

#include <atomic>
#include <cassert>
#include <string>
#include <thread>

struct X {
 int i_;
 std::string s_;
};

std::atomic<int> a;
std::atomic<X*> p;

void create_x() {
 X* x = new X;
 x->i_ = 42;
 x->s_ = “hello”;
 a.store(99, std::memory_order_relaxed);
 p.store(x, std::memory_order_release);// x 和 p 构成
}

void use_x() {
 X* x;
 while (!(x = p.load(std::memory_order_consume)));//
 assert(x->i_ == 42);
 assert(x->s_ == “hello”);
 assert(a.load(std::memory_order_relaxed) == 99);
 
}

int main() {
 std::thread t1(create_x);
 std::thread t2(use_x);
 t1.join();
 t2.join();
 return 0;
}

以上代码

assert(a.load(std::memory_order_relaxed)==99) 是可能触发断言的。线程 t1 对 p 原子写,线程  t2 对 p 原子读(标记为consume),则 t2 原子读 p 依赖于 t1 原子写。在t2中,x 指针 = p指针,则 p 携带了一个依赖到了 x 指针,则 t1 对于 p1 的原子写发生于 x 指针赋值p指针之前。以上的一个推到关系保证了前面两个assert均不会失败。而 a 的值并没这层保证,所以可能会触发第三个断言。可以这么说,只约束对和 p 有关的数据,无关的不约束。

3.3.5 栅栏(fences)

栅栏是一种全局的操作会影响线程内的其他原子操作的顺序。栅栏也常被称为内存屏障(memory barriers),因为这种操作会在代码里加入一行使得某些操作不能跨越。栅栏限制了编译器或者硬件对于不相关变量的重排自由,并且引入了之前不存在happens-before 和 synchronizes-with关系。C++11 原子库定义了可移植的函数 std::atomic_thread_fence() ,该函数接收一个参数用于指定栅栏的类型。

#include <atomic>
#include <thread>
#include <iostream>
#include <assert.h>
std::atomic<bool> x,y;

void write_x_then_y()
{
   x.store(true,std::memory_order_relaxed);// x 设置为true 在 fence之前
   std::atomic_thread_fence(std::memory_order_release);
   y.store(true,std::memory_order_relaxed);// y 设置为true 在 fence之后
}

void read_y_then_x()
{
   while(y.load(std::memory_order_relaxed) == false);// 自旋 直到 y 被设置为true
   std::atomic_thread_fence(std::memory_order_acquire);
   if(x.load(std::memory_order_relaxed) == true)// x 已经在 y 之前被设置为 true
   {
       std::cout << “ok” << std::endl;// y变成true时候x也是true则输出ok
   }
}

int main()
{
   x = false;
   y = false;
   std::thread a(write_x_then_y);
   std::thread b(read_y_then_x);
   a.join();
   b.join();
}

上面这个例子 "ok"一定会输出。std::atomic_thread_fence(std::memory_order_release) 和 std::atomic_thread_fence(std::memory_order_acquire)保证了store x happen before load x。当然 y.store() releasey.load() acquire 也是一样的,这里只是用来举例子。

使用栅栏可以使非原子操作有序化对上面的例子进行一个简单修改,实现的效果是一致的。虽然对x的操作是非原子的,但是栅栏保证了它在 y 写之前被写 y 读之前被读。当然这里对 y 的操作必须是原子性的,如果写 y 的同时读 y 这个过程的结果是未定义。

#include <atomic>
#include <thread>
#include <iostream>
#include <assert.h>
bool x;
std::atomic<bool> y;

void write_x_then_y()
{
   x = true;// x 设置为true
   std::atomic_thread_fence(std::memory_order_release);
   y.store(true,std::memory_order_relaxed);// y 设置为true
}

void read_y_then_x()
{
   while(y.load(std::memory_order_relaxed) == false);
   std::atomic_thread_fence(std::memory_order_acquire);
   if(x)
   {
       std::cout << “ok” << std::endl;// y变成true时候x也是true则输出ok
   }
}

int main()
{
   x = false;
   y = false;
   std::thread a(write_x_then_y);
   std::thread b(read_y_then_x);
   a.join();
   b.join();
}

在 happens-before关系到来之前 sequenced-before关系已经确定,这对于使用原子操作确定非原子操作序列是很重要的。如果同一线程中非原子操作在序列在原子操作之前,本线程原子操作 happens-before 另外一个线程原子操作,则该线程的非原子操作happens-before 另外一个线程的原子操作。使用C++11标准库提供的更高等级的同步工具,例如互斥锁和条件变量也是可以实现的。

四、写在后面

本文主要参考《C++ Concurrency In Action》,并对其内容做了提炼总结。如果大家对C++并发有兴趣,非常推荐大家去拜读原版《C++ Concurrency In Action》,里面有非常多的无锁编程模型。

后面预告一个新的面向对象的开发模型,和我们的阿里云服务器同名,简称就叫ECS(Entity Component System),它被多用于游戏引擎当中,后面我将这个引入到了后台游戏业务开发中,可用于战斗服和大地图等,请期待后面的讲解。

参考链接:

[1]https://blog.csdn.net/K346K346/article/details/85345477

[2]https://blog.csdn.net/ji2581072/article/details/139616941


ALB实现跨地域负载均衡


当客户业务遍及多地,并且在阿里云多个地域均部署了服务时,使用负载均衡结合云企业网及转发路由器可实现应用跨地域级别负载均衡。    


点击阅读原文查看详情。


不同内存序的根本区别在于它们对内存操作顺序的约束程度。顺序一致性最强,所有线程看到的操作顺序都相同;松散序列最弱,几乎不施加任何约束。实际应用中,选择合适的内存序需要权衡性能和正确性。如果对一致性要求不高,可以优先选择性能更高的松散序列;反之,顺序一致性能够保证结果的正确性。

不推荐使用自定义原子类型的主要原因是标准库的原子类型已经足够好了,而且自定义原子类型容易出错,调试也难。除非你对C++内存模型和原子操作有非常深入的了解,并且有非常特殊的场景,否则不要轻易尝试自定义原子类型。毕竟,程序员的时间比CPU的时间值钱。

无锁并发最大的优势当然是性能高,因为它避免了锁的开销。但在实际项目中,选择哪种并发方式取决于多种因素,例如数据的竞争程度、代码的复杂度以及开发人员的经验。如果竞争不激烈且开发人员对无锁编程有丰富经验,那么无锁并发可能是最佳选择。反之,使用锁的并发方式更安全可靠,即使性能略有下降。

无锁的优势就是快,没有上下文切换和内核态的开销,但缺点是容易出错,调试也难,而且扩展性差,只适合于简单的场景。加锁的并发虽然有性能开销,但是实现和调试都比较简单。实际项目中优先考虑用锁,毕竟程序员的时间比CPU的时间值钱。只有在性能瓶颈非常明显,并且锁的开销无法接受的情况下,才考虑无锁化并发,并且需要进行充分的测试和验证。