0%

CPP并发_04同步并发操作

C++ 并发编程学习02

  • 等待事件

  • 带有期望值的等待一次性事件

  • 在限定时间内等待

  • 使用同步操作简化代码

4.1 等待一个事件或其他条件

4.1.1 等待条件达成

使用条件变量唤醒休眠中的线程对其进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
std::mutex mut;
std::queue<data_chunk> data_queue; // 1, 在两线程之间传递数据的队列
std::condition_variable data_cond;

void data_preparation_thread()
{
while(more_data_to_prepare())
{
data_chunk const data=prepare_data();
std::lock_guard<std::mutex> lk(mut);
data_queue.push(data); // 2 将准备好的数据压入队列中
data_cond.notify_one(); // 3 对等待的线程(如果有等待线程)进行通知
}
}

// 处理数据的线程
void data_processing_thread()
{
while(true)
{
// 在这个线程中首先对互斥量上锁
std::unique_lock<std::mutex> lk(mut); // 4
// 传递一个锁和lambda 函数表达式作为等待条件
data_cond.wait(lk,[]{return !data_queue.empty();}); // 5
data_chunk data=data_queue.front();
data_queue.pop();
lk.unlock(); // 6
process(data);
if(is_last_chunk(data))
break;
}
}

wait()会去检查这些条件(通过调用所提供的lambda函数),当条件满足(lambda函数返回true)时返回。如果条件不满足(lambda函数返回false),wait()函数将解锁互斥量,并且将这个线程(上段提到的处理数据的线程)置于阻塞或等待状态。

4.1.2 使用条件变量构建线程安全的队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#include <mutex>
#include <condition_variable>
#include <queue>
#include <memory>

template<typename T>
class threadsafe_queue
{
private:
mutable std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue()
{}
threadsafe_queue(threadsafe_queue const& other)
{
std::lock_guard<std::mutex> lk(other.mut);
data_queue=other.data_queue;
}

void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}

void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
value=data_queue.front();
data_queue.pop();
}

std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}

bool try_pop(T& value)
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty)
return false;
value=data_queue.front();
data_queue.pop();
return true;
}

std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}

bool empty() const
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};

int main()
{}

不多解释。。。

4.2 使用期望值等待一次性事件

假设你乘飞机去国外度假。当你到达机场并办理完各种登机手续后,还需要等待机场广播通知登机时(可能要等很多个小时),你可能会在候机室里面找一些事情来打发时间,比如:读书,上网,或者来一杯价格不菲的机场咖啡。不过,从根本上来说你就在等待一件事情:机场广播能够登机的时间。之前飞机的班次在之后没有可参考性,因为当你再次度假的时候,可能会选择等待另一班飞机。

C++标准库模型将这种一次性事件称为 期望值(future) 。当线程需要等待特定的一次性事件时,某种程度上来说就需要知道这个事件在未来的期望结果。

之后,这个线程会周期性(较短的周期)的等待或检查,事件是否触发(检查信息板);检查期间也会执行其他任务(品尝昂贵的咖啡)。另外,等待任务期间它可以先执行另外一些任务,直到对应的任务触发,而后等待期望值的状态会变为就绪(ready)。一个期望值可能是数据相关的(比如,你的登机口编号),也可能不是。当事件发生时(并且期望状态为就绪),并且这个期望值就不能被重置。

4.2.1 后台任务的返回值

当不着急要任务结果时,可以使用std::async启动一个异步任务。与 std::thread对象等待的方式不同, std::async会返回一个std::future对象,这个对象持有最终计算出来的结果。当需要这个值时,只需要调用这个对象的 get() 成员函数;并且会阻塞线程直到期望值状态为就绪为止;之后,返回计算结果。

std::async: 函数模板,启动一个异步任务,返回 std::future类模板对象;

std::future<T>: 类模板,常用函数,T 是返回结果的类型;

  • get(): 等待,直到获取子线程返回值,解除堵塞。 只能使用一次, 因为get() 是用移动语义实现的,使用 future.get() 后,再次使用 future.get() 将变成 nullptr;

  • wait(): 等待,直到子线程结束,不需要返回值,解除堵塞;

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <iostream>
#include <thread>
#include <future>

int myThread() {
std::cout << "subThread id: " << std::this_thread::get_id() << "...subThread start.\n";
std::chrono::milliseconds time(5000);
std::this_thread::sleep_for(time);
std::cout << "subThread id: " << std::this_thread::get_id() << "...subThread end.\n";
return 7; //线程函数必须要返回值,get()会一直堵塞等待返回值
}
int main(int argc, char const *argv[]) {
std::cout << "main Thread id: " << std::this_thread::get_id() << "...run...\n";
std::future<int> retVal = std::async(myThread); //线程开始执行,虽然线程函数会延迟5s,但是不会卡在这儿
for (size_t i = 0; i < 10; i++)
std::cout << "test..." << i << std::endl;
//如果线程函数没有执行结束,而是会卡在这里,因为这里需要这个线程函数的返回结果
std::cout << "subThread return value: " << retVal.get() << std::endl;
return 0;
}

输出效果:

虽然线程函数里面有个 5秒的延时操作,主线程并不会等待这个子线程的完成才继续执行下去,只有 当主线程需要用到子线程的数据时 ,才会等到子线程的结束,比如 retVal.get()会使得主线程必须等到子线程结束。

因此 future,可以理解为让主线程在将来用到子线程返回时才等待,否则不等。这个功能类似于 thread::join(),让主线程等到子线程的完成,但是这个异步性质,可以等价准备的控制等待事件,即,在哪等待,比如:

1
2
3
4
std::future<int> retVal = std::async(myThread);
for (size_t i = 0; i < 1000; i++) std::cout<<"test_1..."<<i<<std::endl;
std::cout<<"\nsubThread return value: "<<retVal.get()<<std::endl;
for (size_t i = 0; i < 1000; i++) std::cout<<"test_2..."<<i<<std

第一个for循环,会和子线程争夺资源,但是第二个线程必须等子线程结束才能执行,如果使用join(),第一for循环结束,第二个就会执行。使用异步性质,可以控制程序的执行顺序。

可以在函数调用之前向 std:async 传递一个额外的参数,这个参数的类型是std::launch,这是一个枚举类型:

1
2
3
4
5
  enum class launch
    {
        async = 1,
        deferred = 2
    };

std::launch::defered,表明函数调用被延迟到wait()或get()函数调用时才执行;

std::launch::async, 表明函数必须在其所在的独立线程上执行;

4.2.2 任务与期望值关联

std::packaged_task是个类模板,模板参数是各种可调用对象,通过std::packaged_task把各种可调用对象包装起来,以作为线程入口函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int myThread(int val){
    /***代码***/
}

int main(int argc, char const *argv[])
{
std::cout<<"current id: "<<std::this_thread::get_id()<<std::endl;
    std::packaged_task<int(int)> pkg(myThread);
    std::thread trd(std::ref(pkg), 10);
    trd.join();
    std::future<int> ret = pkg.get_future();
    std::cout<<ret.get()<<std::endl;
    std::cout<<"Main thread.\n";
}

4.2.3 使用 std::promises

当一个应用需要处理很多网络连接时,它会使用不同线程尝试连接每个接口,因为这能使网络尽早联通,尽早执行程序。当连接较少的时候,工作没有问题(也就是线程数量比较少)。不幸的是,随着连接数量的增长,这种方式变的越来越不合适;因为大量的线程会消耗大量的系统资源,还有可能造成线程上下文频繁切换(当线程数量超出硬件可接受的并发数时),这都会对性能有影响。最极端的例子:系统资源被创建的线程消耗殆尽,系统连接网络的能力会变的极差。因此通过少数线程(可能只有一个)处理网络连接,每个线程同时处理多个连接事件,对需要处理大量的网络连接的应用而言是普遍的做法。

使用一对 std::promise<bool>/std::future<bool> 实现 单线程处理多接口,能够在某个线程中给它赋值,然后在其他线程中,把这个值取出来。

```cpp
//这个线程计算
void mythread(std::promise& prom, int val){
val++;
//假设这个线程花了2s, 得到了运算结果
prom.set_value(val);
return;
}

//这个线程使用上面那个线程的计算结果
void myTrd(std::future& future){
int ret = future.get();
std::cout<<”myTrd val: “< prom; //int 为保存的数据类型,通过这个prom实现两个线程的数据交互
std::future future = prom.get_future(); // 获取结果值
std::thread trd(mythread, std::ref(prom), 10);
std::thread trd2(myTrd, std::ref(future)); // 传递给第二个线程
std::cout<<”main thread.\n”;
trd.join();
trd2.join();
return 0;
}
``