0%

CPP并发_02线程管理

C++ 并发编程学习02 启动新线程,等待线程与分离线程。

启动一个线程,等待这个线程结束,或放在后台运行。

再看看怎么给已经启动的线程函数传递参数,以及怎么将一个线程的 所有权 从当前std::thread对象移交给另一个。

最后,再来确定线程数,以及识别特殊线程。

02 线程管理

2.1 线程管理的基础

等待线程完成

join() 是简单粗暴的等待线程完成或不等待。调用join()还清理了线程相关的存储部分,这样std::thread对象将不再与已经完成的线程有任何关联,这意味着只能对 一个线程 使用 一次 join(),一旦使用过,std::thread对象就不能再次加入。

后台运行线程

使用detach()会让线程在后台运行,这就意味着主线程不能与之产生直接交互。

分离线程 也被称为 守护线程(daemon threads), UNIX中守护线程是指,没有任何显式的用户接口,并在后台运行的线程。其特点就是长时间运行,线程的生命周期可能会从某一个应用起始到结束,可能会在后台监视文件系统,还有可能对缓存进行清理,亦或对数据结构进行优化。

调用std::thread成员函数detach()来分离一个线程。之后,相应的std::thread对象就与实际执行的线程无关了,并且这个线程也无法加入:

1
2
3
std::thread t(do_background_work);
t.detach();
assert(!t.joinable());

不能对没有执行线程的std::thread对象使用detach()或者join(),并且要用同样的方式进行检查——当std::thread对象使用t.joinable()返回的是true,就可以使用t.detach()

使用场景:

让一个文字处理应用同时编辑多个文档。无论是用户界面,还是在内部应用内部进行,都有很多的解决方法。虽然,这些窗口看起来是完全独立的,每个窗口都有自己独立的菜单选项,但他们却运行在同一个应用实例中。一种内部处理方式是,让每个文档处理窗口拥有自己的线程;每个线程运行同样的的代码,并隔离不同窗口处理的数据。如此这般,打开一个文档就要启动一个新线程。因为是对独立的文档进行操作,所以没有必要等待其他线程完成。因此,这里就可以让文档处理窗口运行在分离的线程上

2.2 向线程函数传递参数

在默认情况下,这些参数会被拷贝至 新线程独立内存空间 中,以供新线程访问,并如同临时变量一样作为右值传递给可调用对象或函数。即使函数中的参数是 引用 的形式:

1
2
void f(int i, std::string const& s); // 指针地址不可修改
std::thread t(f, 3, "hello"); //向线程函数传递参数

如果线程函数期待传入一个引用,可以使用std::ref将参数转换成引用的形式:

1
2
3
void update_data_for_widget(widget_id w,widget_data& data); //线程函数期待传入一个引用
widget_data data;
std::thread t(update_data_for_widget,w,std::ref(data));

从而,update_data_for_widget就会接收到一个data变量的引用,而非data变量的拷贝副本。

上面两个例子的区别在于前者期待传入的 常量引用,而后者期待传入引用,即希望可以对其进行修改。

执行线程的所有权可以多个std::thread实例中互相转移,这是依赖于std::threa实例的 可移动性不可复制性不可复制性 保证了在同一时间点,一个std::thread实例只能关联一个执行线程; 可移动性 使得开发者可以自己决定,哪个实例拥有实际执行线程的所有权。

2.3 转移线程所有权

假设要写一个在后台启动线程的函数,并想通过新线程返回的所有权去调用这个函数,而不是等待线程结束再去调用。或完全与之相反的想法:创建一个线程,并在函数中转移所有权,都必须要等待线程结束。所以,新线程的所有权都需要转移。

这就需要将 移动操作 引入std::thread

下面的例子,创建了两个执行线程,并且在std::thread实例之间(t1, t2, t3)转移所有权。

1
2
3
4
5
6
7
8
void some_function();
void some_other_function();
std::thread t1(some_function); // 1
std::thread t2=std::move(t1); // 2
t1=std::thread(some_other_function); // 3
std::thread t3; // 4
t3=std::move(t2); // 5
t1=std::move(t3); // 6 赋值操作将使程序崩溃

1: 新线程开始与t1相关联;
2: 显式使用std::move()创建 t2后 , t1 的所有权就转移给了 t2 ,之后执行线程就和 t2 没有关联了;
3: 一个临时std::thread对象相关的线程启动了。为什么不显式调用std::move()转移所有权呢?因为,所有者是一个 临时对象 ——移动操作将会 隐式的调用
4: 使用默认构造方式创建 t3, 没有与任何线程相关联;
5: 调用std::move()将与t2关联线程的所有权转移到t3中。因为t2是一个命名对象,需要显式的调用std::move()。移动操作 #5 完成后,t1与执行some_other_function的线程相关联,t2与任何线程都无关联,t3与执行some_function的线程相关联。
6: 最后一个移动操作,将some_function线程的所有权转移给t1。不过,t1已经有了一个关联的线程(执行some_other_function的线程),所以这里系统直接调用std::terminate()终止程序继续运行。

2.4 运行时决定线程数量

std::thread::hardware_concurrency() 这个函数会返回能并发在一个程序中的线程数量。例如,多核系统中,返回值可以是CPU核芯的数量。返回值也仅仅是一个提示,当系统信息无法获取时,函数也会返回0。

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#include <thread>
#include <numeric>
#include <algorithm>
#include <functional>
#include <vector>
#include <iostream>

//模板类,使下面的结构体可以适配更多的数据类型
template<typename Iterator, typename T>
struct accumulate_block //结构体
{
void operator()(Iterator first, Iterator last, T& result)
{
result = std::accumulate(first, last, result);
}
};

template<typename Iterator, typename T> //typename 就是定义的传入的参数的类型, int,float,string等
//first 和 last都是迭代器类型,即 vi.begin(), vi.end()
T parallel_accumulate(Iterator first, Iterator last, T init)
{
// 计算vector中数据的个数, 前面的long占了 4各字节即32位
unsigned long const length = std::distance(first, last);
std::cout << "num length: " << length << std::endl;

if (!length)
return init;
//定义每个线程的最小任务数为 25,可以自己定义
unsigned long const min_per_thread = 25;
// 根据上面的每个线程最小任务数,计算最多需要多少个线程
unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
std::cout << "max threads: " << max_threads << std::endl;
// 计算硬件所能支持的线程束
unsigned long const hardware_threads = std::thread::hardware_concurrency();
std::cout << "hardware threads : " << hardware_threads << std::endl;
//选择上面两者较小的值作为最后的线程数量
//因为上下文频繁的切换会降低线程的性能,
//所以你肯定不想启动的线程数多于硬件支持的线程数量,
//如果hardware_threads=0,则最好就别进行多线程操作了,选个小的值比如2就行
unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
std::cout << "num threads: " << num_threads << std::endl;
// 最后更新,每个线程中处理的元素数量,即上面的每个线程的任务数
unsigned long const block_size = length / num_threads;
std::cout << "block size: " << block_size << std::endl;

std::vector<T> results(num_threads);
//存放线程的容器,启动的线程数要比 num_threads,因为启动之前已经有一个主线程了
std::vector<std::thread> threads(num_threads - 1);

Iterator block_start = first;
// 使用简单的循环启动线程,注意因为最后一个块可能不是完整的,
//所以计算前 n-1 个,最后一个block 单独计算
for (unsigned long i = 0; i < (num_threads - 1); ++i)
{
Iterator block_end = block_start;
// block_end 移动到当前块的末尾,当前块的大小即每个线程中的任务数
std::advance(block_end, block_size);
// 启动线程函数,并传入参数
// 将原来的数据拆分成小的 block 然后多线程进行计算,
//传入的 block_start 和 block_end 都是迭代器, 最后的结果通过引用保存到 results 中
// std::ref 就是前面的提到的,将引用作为参数传递的一种方法,目的是用来保存线程计算的结果
threads[i] = std::thread(accumulate_block<Iterator, T>(),block_start, block_end, std::ref(results[i]));
block_start = block_end;
}
// 计算最后一个 block的结果,因为可能其大小不是 block_size
accumulate_block<Iterator, T>()(block_start, last, results[num_threads - 1]);
//等待线程完成 std::men_fn 为函数适配器
std::for_each(threads.begin(), threads.end(),std::mem_fn(&std::thread::join));
//for (auto& entry : threads) //和上面的效果等价
// entry.join(); // 10

// std::accumulate进行累加, begin, end, init, init就是在前面累加结果的基础加上
return std::accumulate(results.begin(), results.end(), init);
}

int main()
{
std::vector<int> vi;
for (int i = 0; i < 60; ++i)
{
vi.push_back(10);
}
int sum = parallel_accumulate(vi.begin(), vi.end(), 5);
std::cout << "sum=" << sum << std::endl;
return 0;
}

输出效果:

如果 main() 改成如下:

1
2
3
4
5
6
7
8
9
10
11
int main()
{
std::vector<int> vi;
for (int i = 0; i < 71; ++i)
{
vi.push_back(i);
}
int sum = parallel_accumulate(vi.begin(), vi.end(), 5);
std::cout << "sum=" << sum << std::endl;
return 0;
}

结果为:

主要经历了下面的几个步骤:

2.5 标识线程

线程标识类型为std::thread::id,可以通过两种方式进行检索,

  1. 通过调用std::thread对象的成员函数get_id()来获取

  2. 当前线程中调用std::this_thread::get_id()

线程ID 可以在容器中作为键值,如,容器可以存储其掌控下每个线程的信息,或在多线程中互传信息。