C++ 并发编程学习02
等待事件
带有期望值的等待一次性事件
在限定时间内等待
使用同步操作简化代码
4.1 等待一个事件或其他条件 4.1.1 等待条件达成 使用条件变量唤醒休眠中的线程对其进行处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 std ::mutex mut;std ::queue <data_chunk> data_queue; std ::condition_variable data_cond;void data_preparation_thread () { while (more_data_to_prepare()) { data_chunk const data=prepare_data(); std ::lock_guard<std ::mutex> lk(mut); data_queue.push(data); data_cond.notify_one(); } } void data_processing_thread () { while (true ) { std ::unique_lock<std ::mutex> lk(mut); data_cond.wait(lk,[]{return !data_queue.empty();}); data_chunk data=data_queue.front(); data_queue.pop(); lk.unlock(); process(data); if (is_last_chunk(data)) break ; } }
wait()会去检查这些条件(通过调用所提供的lambda函数),当条件满足(lambda函数返回true)时返回。如果条件不满足(lambda函数返回false),wait()函数将解锁互斥量,并且将这个线程(上段提到的处理数据的线程)置于阻塞或等待状态。
4.1.2 使用条件变量构建线程安全的队列 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 #include <mutex> #include <condition_variable> #include <queue> #include <memory> template <typename T>class threadsafe_queue { private : mutable std ::mutex mut; std ::queue <T> data_queue; std ::condition_variable data_cond; public : threadsafe_queue() {} threadsafe_queue(threadsafe_queue const & other) { std ::lock_guard<std ::mutex> lk(other.mut); data_queue=other.data_queue; } void push (T new_value) { std ::lock_guard<std ::mutex> lk(mut); data_queue.push(new_value); data_cond.notify_one(); } void wait_and_pop (T& value) { std ::unique_lock<std ::mutex> lk(mut); data_cond.wait(lk,[this ]{return !data_queue.empty();}); value=data_queue.front(); data_queue.pop(); } std ::shared_ptr <T> wait_and_pop() { std ::unique_lock<std ::mutex> lk(mut); data_cond.wait(lk,[this ]{return !data_queue.empty();}); std ::shared_ptr <T> res(std ::make_shared<T>(data_queue.front())); data_queue.pop(); return res; } bool try_pop (T& value) { std ::lock_guard<std ::mutex> lk(mut); if (data_queue.empty) return false ; value=data_queue.front(); data_queue.pop(); return true ; } std ::shared_ptr <T> try_pop() { std ::lock_guard<std ::mutex> lk(mut); if (data_queue.empty()) return std ::shared_ptr <T>(); std ::shared_ptr <T> res(std ::make_shared<T>(data_queue.front())); data_queue.pop(); return res; } bool empty () const { std ::lock_guard<std ::mutex> lk(mut); return data_queue.empty(); } }; int main () {}
不多解释。。。
4.2 使用期望值等待一次性事件
假设你乘飞机去国外度假。当你到达机场并办理完各种登机手续后,还需要等待机场广播通知登机时(可能要等很多个小时),你可能会在候机室里面找一些事情来打发时间,比如:读书,上网,或者来一杯价格不菲的机场咖啡。不过,从根本上来说你就在等待一件事情:机场广播能够登机的时间。之前飞机的班次在之后没有可参考性,因为当你再次度假的时候,可能会选择等待另一班飞机。
C++标准库模型将这种一次性事件称为 期望值(future) 。当线程需要等待特定的一次性事件时,某种程度上来说就需要知道这个事件在未来的期望结果。
之后,这个线程会周期性(较短的周期)的等待或检查,事件是否触发(检查信息板);检查期间也会执行其他任务(品尝昂贵的咖啡)。另外,等待任务期间它可以先执行另外一些任务,直到对应的任务触发,而后等待期望值的状态会变为就绪(ready)。一个期望值可能是数据相关的(比如,你的登机口编号),也可能不是。当事件发生时(并且期望状态为就绪),并且这个期望值就不能被重置。
4.2.1 后台任务的返回值 当不着急要任务结果时,可以使用std::async
启动一个异步任务。与 std::thread
对象等待的方式不同, std::async
会返回一个std::future
对象,这个对象持有最终计算出来的结果。当需要这个值时,只需要调用这个对象的 get() 成员函数;并且会阻塞线程直到期望值状态为就绪为止;之后,返回计算结果。
std::async
: 函数模板,启动一个异步任务,返回 std::future
类模板对象;
std::future<T>
: 类模板,常用函数,T 是返回结果的类型;
例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 #include <iostream> #include <thread> #include <future> int myThread () { std ::cout << "subThread id: " << std ::this_thread::get_id() << "...subThread start.\n" ; std ::chrono::milliseconds time (5000 ) ; std ::this_thread::sleep_for(time); std ::cout << "subThread id: " << std ::this_thread::get_id() << "...subThread end.\n" ; return 7 ; } int main (int argc, char const *argv[]) { std ::cout << "main Thread id: " << std ::this_thread::get_id() << "...run...\n" ; std ::future <int > retVal = std ::async(myThread); for (size_t i = 0 ; i < 10 ; i++) std ::cout << "test..." << i << std ::endl ; std ::cout << "subThread return value: " << retVal.get() << std ::endl ; return 0 ; }
输出效果:
虽然线程函数里面有个 5秒的延时操作,主线程并不会等待这个子线程的完成才继续执行下去,只有 当主线程需要用到子线程的数据时 ,才会等到子线程的结束,比如 retVal.get()
会使得主线程必须等到子线程结束。
因此 future
,可以理解为让主线程在将来用到子线程返回时才等待,否则不等。这个功能类似于 thread::join()
,让主线程等到子线程的完成,但是这个异步性质,可以等价准备的控制等待事件,即,在哪等待,比如:
1 2 3 4 std ::future <int > retVal = std ::async(myThread);for (size_t i = 0 ; i < 1000 ; i++) std ::cout <<"test_1..." <<i<<std ::endl ;std ::cout <<"\nsubThread return value: " <<retVal.get()<<std ::endl ;for (size_t i = 0 ; i < 1000 ; i++) std ::cout <<"test_2..." <<i<<std
第一个for循环,会和子线程争夺资源,但是第二个线程必须等子线程结束才能执行,如果使用join(),第一for循环结束,第二个就会执行。使用异步性质,可以控制程序的执行顺序。
可以在函数调用之前向 std:async
传递一个额外的参数,这个参数的类型是std::launch
,这是一个枚举类型:
1 2 3 4 5 enum class launch { async = 1 , deferred = 2 };
std::launch::defered
,表明函数调用被延迟到wait()或get()函数调用时才执行;
std::launch::async
, 表明函数必须在其所在的独立线程上执行;
4.2.2 任务与期望值关联 std::packaged_task
是个类模板,模板参数是各种可调用对象,通过std::packaged_task
把各种可调用对象包装起来,以作为线程入口函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 int myThread (int val) { } int main (int argc, char const *argv[]) { std ::cout <<"current id: " <<std ::this_thread::get_id()<<std ::endl ; std ::packaged_task<int (int )> pkg(myThread); std ::thread trd (std ::ref(pkg), 10 ) ; trd.join(); std ::future <int > ret = pkg.get_future(); std ::cout <<ret.get()<<std ::endl ; std ::cout <<"Main thread.\n" ; }
4.2.3 使用 std::promises
当一个应用需要处理很多网络连接时,它会使用不同线程尝试连接每个接口,因为这能使网络尽早联通,尽早执行程序。当连接较少的时候,工作没有问题(也就是线程数量比较少)。不幸的是,随着连接数量的增长,这种方式变的越来越不合适;因为大量的线程会消耗大量的系统资源,还有可能造成线程上下文频繁切换(当线程数量超出硬件可接受的并发数时),这都会对性能有影响。最极端的例子:系统资源被创建的线程消耗殆尽,系统连接网络的能力会变的极差。因此通过少数线程(可能只有一个)处理网络连接,每个线程同时处理多个连接事件,对需要处理大量的网络连接的应用而言是普遍的做法。
使用一对 std::promise<bool>/std::future<bool>
实现 单线程处理多接口 ,能够在某个线程中给它赋值,然后在其他线程中,把这个值取出来。
```cpp //这个线程计算 void mythread(std::promise& prom, int val){ val++; //假设这个线程花了2s, 得到了运算结果 prom.set_value(val); return; }
//这个线程使用上面那个线程的计算结果 void myTrd(std::future& future){ int ret = future.get(); std::cout<<”myTrd val: “< prom; //int 为保存的数据类型,通过这个prom实现两个线程的数据交互 std::future future = prom.get_future(); // 获取结果值 std::thread trd(mythread, std::ref(prom), 10); std::thread trd2(myTrd, std::ref(future)); // 传递给第二个线程 std::cout<<”main thread.\n”; trd.join(); trd2.join(); return 0; } ``