涉及线程管理、同步(互斥锁、条件变量)以及线程本地存储
Boost多线程 来源
线程管理 如何等待一个不同的线程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 #include <boost/thread.hpp> #include <iostream> void wait (int seconds) { boost::this_thread::sleep(boost::posix_time::seconds(seconds)); } void threadFun () { for (int i = 0 ; i < 5 ; ++i) { wait(1 ); std ::cout << i << std ::endl ; } } int main () { boost::thread t (threadFun) ; t.join(); return 0 ; }
1秒间隔输出:
一个特定的线程可以通过诸如 t 的变量访问,通过这个变量等待着它的使用 join()
方法终止。 上述例子中,使用一个循环把 5 个数字写入标准输出流。使用 wait
执行延迟,wait
调用sleep
函数,sleep
来自Boost.thread
,位于boost::this_thread
命名空间内, sleep()
在预计的一段时间或一个特定的时间点后才让线程继续执行。
关于阻塞调用和非阻塞调用:
阻塞调用:
调用结果返回前,当前线程会被挂起,调用线程只有在得到结果之后才返回;
非阻塞调用:
在不能立刻得到结果之前,该调用不会阻塞当前线程。
例子:
打电话给书店老板问有没有《分布式系统》的书,如果是阻塞调用,你会一直把自己“挂起”,直到得到这本书有没有的结果, 如果是非阻塞式调用,你不管老板有没有告诉你,你自己先一边玩去了,偶尔过几分钟 check 一下老板有没有结果。
这里的阻塞和非阻塞与是否同步异步无关,根老板通过什么方式回答你无关。
同步和异步:
关注的是 消息通信机制
同步:
就是在发布一个 调用 时,在没有得到结果之前,该 调用 就不返回,但是一旦调用返回,就得到返回值了,也就是调用者 主动等待 这个调用结果。
异步:
调用发布后,这个调用就直接返回了,所以没有返回结果。 也就是当一个异步过程调用发出后,调用者不会立刻得到结果,而是在调用发出后,被调用者通过状态、通知来通知调用者,或者通过回调函数处理这个调用。
例子:
你打电话问书店老板有没有《分布式系统》这本书,如果是同步通信机制,书店老板会说,你稍等,”我查一下",然后开始查啊查,等查好了(可能是5秒,也可能是一天)告诉你结果(返回结果)。而异步通信机制,书店老板直接告诉你我查一下啊,查好了打电话给你,然后直接挂电话了(不返回结果)。然后查好了,他会主动打电话给你。在这里老板通过“回电”这种方式来回调。
知乎
如何通过中断点让线程中断 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 #include <boost/thread.hpp> #include <iostream> void wait (int seconds) { boost::this_thread::sleep(boost::posix_time::seconds(seconds)); } void threadFun () { try { for (int i = 0 ; i < 5 ; ++i) { wait(1 ); std ::cout << i << std ::endl ; } } catch (boost::thread_interrupted&) { std ::cout << "catch thread interrupted" << std ::endl ; } } int main () { boost::thread t (threadFun) ; wait(3 ); t.interrupt(); t.join(); }
具体的流程如下:
由于在main里3秒后调用 interrupt()方法。 因此,相应的线程被中断,并抛出一个 boost::thread_interrupted 异常。 这个异常在线程内也被正确地捕获,threadFun
函数也在处理完这个异常后返回,线程被终止,这反过来也将终止整个程序,因为main()
一直在等待该线程,所以使用join()
终止该线程
所以最后的输出结果为:
在一个线程上调用interrupt()
会中断相应的线程,同时,中断意味着一个类型为boost::thread_interrupt
的异常,它会在这个线程中抛出,只有在 线程达到中断点时才会发生 ,比如上面代码中的this_thread
或者t.join
就是线程的中断点。
如果给定的线程中不包含任何中断点,简单调用interrupt()
就不会起作用。每当(调用)一个线程中断点,它都会检查interrupt
是否被调用,只有被调用了boost::thread_interrupt
异常才会相应地抛出。上面例子中的sleep()
被调用了 5 次,所以该线程也就检查了五次它是否应该被中断。
思考,如果上面的程序 threadFun
函数中的 wait(1)
和下面的 cout
函数进行位置替换是什么样的?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 void threadFun () { try { for (int i = 0 ; i < 5 ; ++i) { std ::cout << i << std ::endl ; wait(1 ); } } catch (boost::thread_interrupted&) { std ::cout << "catch thread interrupted" << std ::endl ; } }
输出:
流程图如下:
因为输出在wait(1)
前面,所以先进行了输出再进行等待。
线程中断点:
线程 并非 在任何时候都可以中断的,boost的thread库中定义了 若干中断点 ,只有当线程执行到中断点的时候才会被中断,一个线程可以有若干个线程中断点。
thread 库定义了 9 个中断点,它们都是函数:
1 2 3 4 5 6 7 8 9 thread::join(); thread::timed_join(); condition_variable::wait(); condition_variable::timed_wait(); condition_variable_any::wait(); condition_variable_any::timed_wait(); thread::sleep(); this_thread::sleep(); this_thread::interruption_point();
前8个都是某种形式的等待函数,表明线程在阻塞的时候可以被中断。而最后一个this_thread::interruption_point()
;则是一个特殊的中断点函数,它并不等待,只是起到一个标签的作用,表示线程执行到这个地方可以被中断。
参考
同步 虽然多线程的使用可以提高应用程序的性能,但也增加了复杂性。使用使用线程在同一时间执行几个函数,访问共享资源时必须同步。Boost.Thread
提供了同步线程的类。
boost::mutex 互斥类,lock() 和 unlock() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 #include <boost/thread.hpp> #include <iostream> void wait (int seconds) { boost::this_thread::sleep(boost::posix_time::seconds(seconds)); } boost::mutex mutex; void threadFun () { for (int i = 0 ; i < 5 ; ++i) { wait(1 ); mutex.lock(); std ::cout << "Thread " << boost::this_thread::get_id() << ": " << i << std ::endl ; mutex.unlock(); } } int main () { std ::cout << "start" << std ::endl ; boost::thread t1 (threadFun) ; boost::thread t2 (threadFun) ; t1.join(); t2.join(); return 0 ; }
输出效果:
这里使用互斥对象进行同步, Boost.Thread
提供多个互斥类, boost::mutex
是最简单的一个,互斥的基本原则是当一个特定的线程拥有资源的时候 防止 其他资源夺取其所有权。一旦释放,其他的线程可以取得所有权。 这将导致线程等待至另一个线程完成处理一些操作,从而相应地释放互斥对象的所有权。
main()
创建两个线程,同时执行 threadFun()
函数。 利用 for 循环,每个线程数到5,因为标准输出流(std::cout
)是一个全局性的被所有线程共享的对象,不能保证其可以安全地从多个线程访问。因此,标准输出流必须同步:在任何时候,只有一个线程可以访问 std::out
不管哪个线程成功调用 lock()
方法,其他所有线程必须等待,直到 unlock()
被调用。
思考:互斥锁放置在不同的位置是否会得到不一样的结果?
进行如下修改:
1 2 3 4 5 6 7 8 9 10 void threadFun () { mutex.lock(); for (int i = 0 ; i < 5 ; ++i) { wait(1 ); std ::cout << "Thread " << boost::this_thread::get_id() << ": " << i << std ::endl ; } mutex.unlock(); }
感觉这个例子能更好说明问题,虽然是两个线程,但是线程2 需要等待线程1 运行结束之后释放 std::cout
的资源之后才会得到 std::cout
的资源,而这正是因为lock
和unlock
的存在才能保证资源之间不相互抢占。
疑问: 当注释掉 lock
和unlock
之后整体输出和上一个例子效果基本一样,但是当再注释到 wait(1)
之后,就会出现std::cout
混乱的现象,猜测是因为等待时间的存在,资源之间的抢占并不严重,但没有这个等待时间,资源之间相互抢占的现象将显现出来。
boost::lock_guard —— 不直接调用 lock() 和 unlock() boost::lock_guard
在其内部构造和析构函数分别自动调用lock()
和unlock()
函数。
只要修改 threadFun
的部分:
1 2 3 4 5 6 7 8 9 10 void threadFun () { for (int i = 0 ; i < 5 ; ++i) { wait(1 ); boost::lock_guard<boost::mutex> lock(mutex); std ::cout << "Thread " << boost::this_thread::get_id() << ": " << i << std ::endl ; } }
放在位置1 和 位置2 上看不出区别。
位置1:
位置2:
一直存在的疑问: 从输出看两个线程是达到了同步输出的效果,但是从输出的线程id看,每次输出顺序是不太一样的,不太清楚原因是什么?难道本来就是这样的?本来期望的是每次两个线程输出的顺序也会一直保持一致。
模板类(Class template): unique_lock 和 shared_lock 类模板和模板类
boost::unique_lock() ——独占锁 独占锁意味着一个互斥量同时只能被一个线程获取。 其他线程必须等待,直到互斥体再次被释放。
官方文档介绍:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 boost::timed_mutex mutex; void threadFun () { for (int i = 0 ; i < 5 ; ++i) { wait(1 ); boost::unique_lock<boost::timed_mutex> lock(mutex, boost::try_to_lock); if (!lock.owns_lock()) lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(1 )); std ::cout << "Thread " << boost::this_thread::get_id() << ": " << i << std ::endl ; boost::timed_mutex *m = lock.release(); m->unlock(); } }
这个函数显示了三种方法获取一个互斥体:
lock()
,一直等待直到获得一个互斥体;
try_lock()
, 不会等待,只会在互斥体可用的时候才能获得;
timed_lock()
,试图在一定时间内获取互斥体
boost::shared_lock() ——非独占锁 该类 必须和boost::shared_mutex
类型的互斥量结合使用
官方文档介绍:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 #include <boost/thread.hpp> #include <iostream> #include <vector> #include <cstdlib> #include <ctime> void wait (int seconds) { boost::this_thread::sleep(boost::posix_time::seconds(seconds)); } boost::shared_mutex mutex; std ::vector <int > random_numbers;void fill () { std ::srand(static_cast <unsigned int >(std ::time(0 ))); for (int i = 0 ; i < 3 ; ++i) { boost::unique_lock<boost::shared_mutex> lock(mutex); random_numbers.push_back(std ::rand()); lock.unlock(); wait(1 ); } } void print () { for (int i = 0 ; i < 3 ; ++i) { wait(1 ); boost::shared_lock<boost::shared_mutex> lock(mutex); std ::cout << "i : " << i << ", value: " << random_numbers.back() << std ::endl ; } } int sum = 0 ;void count () { for (int i = 0 ; i < 3 ; ++i) { wait(1 ); boost::shared_lock<boost::shared_mutex> lock(mutex); sum += random_numbers.back(); } } int main () { boost::thread t1 (fill) ; boost::thread t2 (print) ; boost::thread t3 (count) ; t1.join(); t2.join(); t3.join(); std ::cout << "Sum: " << sum << std ::endl ; return 0 ; }
效果:
这里多线程的效果就是似乎 fill
,print
,count
似乎是在同一时间一起完成的。
具体:
print()
和 count()
都可以只读访问 random_numbers
。
print()
函数把 random_numbers
里的最后一个数写到标准输出;
count()
函数把它统计到 sum
变量。 由于两个函数都 没有修改 random_numbers
,所以两个函数都可以在同一时间用 boost::shared_lock
类型的 非独占锁 访问它。
fill()
函数需要用一个boost::unique_lock
类型的独占锁,因为它需要向random_numbers
插入新的随机数,在lock
显示的调用unlock()
释放互斥量后,fill()
又等待1秒,保证了容器random_numbers
中至少存在一个随机数可以被print()
和count()
访问。对应地, 这两个函数在 for 循环的开始调用了 wait(1)
。
条件变量 condition_variable 条件变量,可以同步那些独立的线程,使数组的每个元素都被不同的线程立即添加到 number_randoms
中。
条件变量的使用总是和互斥体及共享资源联系在一起的。 线程首先锁住互斥体,然后检验共享资源的状态是否处于可使用的状态。如果不是,那么线程就要等待条件变量。要指向这样的操作就必须在等待的时候将互斥体解锁,以便其他线程可以访问共享资源并改变其状态。它还得保证从等到得线程返回时互斥体是被上锁得。当另一个线程改变了共享资源的状态时,它就要 通知 正在等待 条件变量 得线程,并将之返回等待的线程。
例子1 对上面的程序进行改进,为确保正确地处理随机数,需要一个 允许检查多个线程 之间 特定条件 的 条件变量 来 同步 每个独立的线程。删除了wait()
和 count()
。线程不用在每个循环迭代中等待一秒,而是尽可能快地执行:
来源
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 #include <boost/thread.hpp> #include <iostream> #include <vector> #include <cstdlib> #include <ctime> boost::mutex mutex; boost::condition_variable_any cond; std ::vector <int > random_numbers;void fill () { std ::srand(static_cast <unsigned int >(std ::time(0 ))); for (int i = 0 ; i < 3 ; ++i) { boost::unique_lock<boost::mutex> lock(mutex); random_numbers.push_back(std ::rand()); cond.notify_all(); cond.wait(mutex); } } void print () { std ::size_t next_size = 1 ; for (int i = 0 ; i < 3 ; ++i) { boost::unique_lock<boost::mutex> lock(mutex); while (random_numbers.size() != next_size) cond.wait(mutex); std ::cout << random_numbers.back() << std ::endl ; ++next_size; cond.notify_all(); } } int main () { boost::thread t1 (fill) ; boost::thread t2 (print) ; t1.join(); t2.join(); return 0 ; }
程序运行流程说明:
从main()
开始,两个线程t1(fill)
和t2(print)
,虽然是多线成,但其实两者还是存在 启动 执行的先后顺序,t1
会比t2
先启动一会儿,但这不是问题,现在先 假设 t1
线程先启动(t1启动后t2立马启动)。进入fill()
函数,std::srand
生成随机数,然后进入循环,在对random_numbers
操作前使用boost::unique_lock
进行加锁,保证下面push_bak()
时不会有其他线程同时操作这个数组引起冲突,等到push_back()
结束后,条件变量cond
通知其他等待该线程的线程的可以继续运行,然后自己进入wait()
状态。
现在看t1
运行的时候,t2
发生了什么?t2
进入print()
函数,进入循环执行到while(random_numbers.size() != next_size)
时,这时可能t1
线程中还没有运行到将随机数push_back
到random_numbers
这个操作,所以random_numbers.size()
不等于next_size
,进而进入cond.wait(mutex)
等待状态,等到t1
运行到cond.notify_all()
时才会被唤醒,进而释放互斥量mutex
,继续运行到t2
线程的cond.notify_all()
以通知t1
线程,然后循环到while(random_numbers.size() != next_size)
发现满足,继续进入wait
状态。
t1
线程此时在wait()
状态,收到t2
线程的notify_all()
后程序才继续运行,然后以此往复,直到三次循环结束。
例子2 来源
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 #include <iostream> #include <string> #include <thread> #include <mutex> #include <condition_variable> std ::mutex mutex;std ::condition_variable cv;std ::string data;bool ready = false ; bool processed = false ; void Worker () { std ::unique_lock<std ::mutex> lock(mutex); cv.wait(lock, [] { return ready; }); std ::cout << "工作线程正在处理数据..." << std ::endl ; std ::this_thread::sleep_for(std ::chrono::seconds(1 )); data += " 已处理" ; processed = true ; std ::cout << "工作线程通知数据已经处理完毕。" << std ::endl ; lock.unlock(); cv.notify_one(); } int main () { std ::thread worker (Worker) ; { std ::lock_guard<std ::mutex> lock(mutex); std ::cout << "主线程正在准备数据..." << std ::endl ; std ::this_thread::sleep_for(std ::chrono::seconds(1 )); data = "样本数据" ; ready = true ; std ::cout << "主线程通知数据已经准备完毕。" << std ::endl ; } cv.notify_one(); { std ::unique_lock<std ::mutex> lock(mutex); cv.wait(lock, [] { return processed; }); } std ::cout << "回到主线程,数据 = " << data << std ::endl ; worker.join(); return 0 ; }
输出效果:
程序运行流程说明:
条件变量被通知后,挂起的线程就被唤醒,但是唤醒也有可能是假唤醒,或者是因为超时等异常情况,所以被唤醒的线程仍要检查条件是否满足,所以 wait
是放在条件循环里面。cv.wait(lock, [] { return ready; });
相当于:while (!ready) { cv.wait(lock); }
。
例子3 来源
目的: 有两个线程被创建,一个在buffer中放入100个整数,另一个将它们从buffer中取出。这个有界的缓存一次只能存放10个整数,所以这两个线程必须周期性的等待另一个线程,函数put和get使用条件变量来保证线程等待完成操作所必须的状态。
说明: 原始程序感觉是存在问题的,并没有达到预期的效果,所以进行了修改
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 #include <boost/thread/thread.hpp> #include <boost/thread/mutex.hpp> #include <boost/thread/condition.hpp> #include <iostream> const int BUF_SIZE = 10 ;const int ITERS = 100 ;boost::mutex io_mutex; class buffer { public : typedef boost::mutex::scoped_lock scoped_lock; buffer(): p(0 ), c(0 ), full(0 ) { } void put (int m) { scoped_lock lock (mutex) ; if (full >= BUF_SIZE) { { boost::mutex::scoped_lock lock (io_mutex) ; std ::cout << "Buffer is full. Waiting..." << std ::endl ; } cond.notify_one(); while (full == BUF_SIZE) cond.wait(lock); } buf[p] = m; { boost::mutex::scoped_lock lock (io_mutex) ; std ::cout << "sending: " << m << std ::endl ; } p = (p + 1 ) % BUF_SIZE; ++full; } int get () { scoped_lock lk (mutex) ; if (full == 0 ) { { boost::mutex::scoped_lock lock (io_mutex) ; std ::cout << "Buffer is empty. Waiting..." << std ::endl ; } cond.notify_one(); while (full == 0 ) cond.wait(lk); } int i = buf[c]; c = (c + 1 ) % BUF_SIZE; --full; return i; } private : boost::mutex mutex; boost::condition cond; unsigned int p, c, full; int buf[BUF_SIZE]; }; buffer buf; void writer () { for (int n = 0 ; n <= ITERS; ++n) { buf.put(n); } } void reader () { for (int x = 0 ; x < ITERS; ++x) { int n = buf.get(); { boost::mutex::scoped_lock lock (io_mutex) ; std ::cout << "received: " << n << std ::endl ; } } } int main (int argc, char * argv[]) { boost::thread thrd1 (&reader) ; boost::thread thrd2 (&writer) ; thrd1.join(); thrd2.join(); return 0 ; }
最后效果(部分截图):
线程本地存储(TLS) 来源
线程本地存储(TLS)是一个只能由一个线程访问的专门的存储区域,TLS变量是一个只对某个特定线程而非整个程序可见的全局变量。
Boost线程库提供了智能指针boost::thread_specific_ptr
来访问本地存储线程。每一个线程第一次使用这个智能指针的实例时,它的初值是NULL,所以必须要先检查这个它的只是否为空,并且为它赋值。Boost线程库保证本地存储线程中保存的数据会在线程结束后被清除。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 #include <boost/thread/thread.hpp> #include <boost/thread/mutex.hpp> #include <boost/thread/tss.hpp> #include <iostream> boost::mutex io_mutex; boost::thread_specific_ptr<int > ptr; struct count { count(int id) : id(id) { } void operator () () { if (ptr.get() == 0 ) ptr.reset(new int (0 )); for (int i = 0 ; i < 10 ; ++i) { (*ptr)++; boost::mutex::scoped_lock lock (io_mutex) ; std ::cout << id << ": " << *ptr << std ::endl ; } } int id; }; int main (int argc, char * argv[]) { boost::thread thrd1 (count(1 )) ; boost::thread thrd2 (count(2 )) ; thrd1.join(); thrd2.join(); return 0 ; }
创建了两个线程来初始化本地存储线程,并有10次循环,每一次都会增加智能指针指向的值,并将其输出到std::cout
上(由于std::cout
是一个共享资源,所以通过互斥体进行同步)。main
线程等待这两个线程结束后就退出。从这个例子输出可以明白的看出每个线程都处理属于自己的数据实例,尽管它们都是使用同一个boost::thread_specific_ptr
。
输出:
显然虽然使用的是全局的boost::thread_specific_ptr
智能指针,但是各线程之间的值互不干扰。
其他:
关于 mute
和 lock
mutex
是一个类,用它可以生成相应的互斥体,然后呢,我们就可以把互斥体加入到我们的线程中去啦。互斥体本身里有lock
与unlock
的实现, 不过是私有的,只能通过它的友元函数来调用,我们没有办法直接用。想一个简单方法:所以用lock模板类
对它进行了封装,用lock模板类
生成的对象相进行lock
与unlock
。对于mutex
的使用,多个线程应该对应同一个mutex
对象。
对于lock模板类
(例如:boost::unique_lock(),boost::shared_lock()
)来说,它把对对应类型(指的是muetx类
的类型)lock
与unlock
操作放入到了它的构造与析构函数里了,这样可以自动完成它的使命了。
关于boost::condition_variable
对于notify_one
和notify_all
的作用就是:把一个线程对的共享资源unlock, notify_one
启用一个线程, notify_all
激活所有线程。
对于wait
,就是本线程得到互斥体后,本线程就给其加锁,然后在后续的操作中发现条件不满足时再把锁打开让给其他的线程,而自己进行阻塞,等待notify
,等收到之后再抢得互斥体得lock
,继续后续的操作。
参考: C++ Boost Thread 编程指南
The Boost C++ Libraries
菜鸟教程- C++ 多线程