C++标准库-17-并发
命名空间namespace std::this_thread提供了几个线程相关的专属操作:get_id、sleep_for、sleep_until以及yield。
C++标准库启动线程的方法可分为三类:async、thread以及packaged_task。其中,async一般和futrue或者shared_future搭配,thread一般和promise搭配(也要结合future使用),packaged_task一般和future搭配。
通过静态函数unsigned int std::thread::hardware_concurrency(),可以查询并行线程的可能数量(仅供参考)。
async和future
一般概念
利用async和future可以很方便地进行并发执行。
async会尝试在新的线程中异步执行对应的函数foo,但是不保证一定会执行。应将async返回值赋值给一个future对象fobj,通过调用该future对象的get成员函数,确保该函数foo的执行。
调用future对象的get成员函数,可能有三种情况发生:
如果foo函数被启动于一个新线程,并且已经执行结束,那么get立刻返回执行结果;
如果foo函数被启动于一个新的线程,但是尚未结束,那么get会阻塞等待foo执行结束后获得结果并返回;
如果foo尚未启动,会被强迫启动,类似于一个同步调用;get会阻塞知道foo执行结束。
注意事项:
尽量推迟get成员函数的执行,即需要线程执行结果时再调用get;
传递给async的可以是任何类型的callable对象;
一个future<>对象,只能调用get一次,然后future对象就处于无效状态,通过future.valid()判断future对象是否有效;
async除了可以传递callable对象,还可以给该callable对象传递参数,参数位于callable对象之后;也可以传递成员函数,成员函数的后面应紧跟对应类的对象的引用或者指针;
示例如下:
#include <iostream>
#include <thread>
#include <future>
#include <chrono>
#include <random>
using namespace std;
int do_something(char c)
{
default_random_engine dre(c);
uniform_int_distribution<int> di(10,1000);
for(int i=0;i<10;++i)
{
//线程挂起随机时间,毫秒;
this_thread::sleep_for(chrono::milliseconds(di(dre)));
cout.put(c).flush();
}
return c;
}
int func1()
{
return do_something('.');
}
int func2()
{
return do_something('+');
}
int main(int argc,char** argv)
{
cout << " starting func1() in background and func2() in foreground..." << endl;
future<int> result1(async(func1)); //后台运行线程func1,返回结果存在result1中;
//func1不保证一定立刻运行,也可能不运行;
int result2 = func2(); //主线程中运行func2
/***************************************************************************************
+result1.get()保证了对应的线程运行,即确保了func1运行:
+ (1)如果func1线程已经运行结束,get()直接返回结果;
+ (2)如果func1线程正在运行,get()会阻塞主线程,等待func1线程运行结束再返回结果;
+ (3)如果func1线程还未运行,则运行对应函数,等待结束后返回结果,这种情况一般出现在单线程系统或者禁用多线程的系统中。 ****************************************************************************************/
int result = result2+result1.get();
cout << "\nresult of func1()+func2() : " << result << endl;
return 0;
}
launch策略
async还可以指定launch策略,作为第一个参数传递给async,可选项有:
std::launch::async: enable asynchronous evaluation
std::launch::deferred: enable lazy evaluation
默认policy为“std::launch::async | std::launch::deferred”。
如果明确指定为std::launch::async,那么目标函数要么在新线程中异步执行,要么抛出异常std::system_error。
如果指定为std::launch::deferred,那么就会将目标函数的执行推迟到调用fobj.get时。
处理异常
future对象调用get()能够处理异常:线程中的异常会被保持并传递到get函数调用处,只需对get函数处理异常即可。
等待和轮询
一个future<>对象,只能调用get一次,然后future对象就处于无效状态。如果对future对象f调用wait函数,那么可以强制启动该future对象的线程,并等待这一个后台操作终止:
std::future<T> f(std::async(func));
//...
f.wait();// wait for func to be done.
wait还有两个变种:wait_for和wait_until。
shared_future
通过使用shared_future对象,可以多次调用get,要么返回相同结果,要么都抛出异常.适用于在多个线程中查询后台线程执行情况。
可以直接将async赋值给shared_future<T>,也可以调用future的成员函数share:
shared_future<int> f = async(querryNumber);
auto f = async(querryNumber).share();//此后,async(querryNumber)对应的future对象状态为无效
一个简答示例如下:
#include <future>
#include <iostream>
#include <cstdlib>
using namespace std;
int querryNumber()
{
cout << "Read Number:";
int num;
cin >> num;
if(!cin)
{
throw runtime_error("No number read.");
}
return num;
}
void doSomething(char c,shared_future<int> f)
{
try
{
int num = f.get(); // 每个线程都调用get一次;
for(int i=0;i<num;++i)
{
this_thread::sleep_for(chrono::milliseconds(500));
cout.put(c).flush();
}
}
catch(const std::exception& e)
{
cerr << "Exception in Thread_" << this_thread::get_id() << ": "
<< e.what() << endl;
}
}
int main(int argc,char** argv)
{
try
{
shared_future<int> f = async(querryNumber);
auto f1 = async(launch::async,doSomething,'+',f);
auto f2 = async(launch::async,doSomething,'-',f);
auto f3 = async(launch::async,doSomething,'*',f);
f1.get();
f2.get();
f3.get();
}
catch(const std::exception& e)
{
std::cerr << e.what() << '\n';
}
cout << "\ndone" << endl;
return EXIT_SUCCESS;
}
Thread和Promise
thread
std::thread为底层接口,定义于头文件<thread>中。利用std::thread可以启动线程。给std::thread传入一个callable对象,可以启动线程来完成相应的任务。
#include <thread>
void doSomething();
std::thread t(doSomething);
//...
t.join();//等待t线程结束;
一个std::thread启动后,要么等待其结束(join),要么将其卸离(detach),即运行于后台不受任何控制。
detached thread应该尽量只访问本地资源、对象;
std::thread线程内部如果发生了异常,且未被捕获,那么程序会立刻调用std::terminate()终止运行。
如果std::thread对象t被detached,当t声明周期结束,如main退出,那么对应的线程t也会被强制结束。
给线程传递参数时,应该尽量使用按值传递,避免按引用传参。
Promise
用来传递运行结果和异常(特殊的运行结果)的一般性机制:std::promise。通常定义线程任务函数时,接收一个std::promise<>引用作为参数,以便传递运行结果和异常。
std::promise在线程任务函数中设置执行结果或者异常。
在线程外,通常是主线程中,std::promise经常和future配合起来使用,std::promise对象的get_future()成员函数可以返回也给future对象,通过该future对象调用get()成员函数,可以等待线程执行结束或者抛出线程中的异常。
下面是一个简单示例:
#include <iostream>
#include <thread>
#include <future>
#include <chrono>
#include <random>
#include <exception>
using namespace std;
// thread function
void doSomething(promise<string>& p)
{
try
{
cout << "read char ('x' for exception): ";
char c = cin.get();
if(c=='x')
{
throw runtime_error(string("char ")+c+" read");
}
cout << " - GET " << c << endl;
string s = string("char ") + c + " processed.";
p.set_value(std::move(s));
//线程结束后才设置shared state 为ready
//p.set_value_at_thread_exit(std::move(s));
cout << " - after set_value ..." << endl;
}
catch(...)
{
//线程结束后才设置shared state 为ready
//p.set_exception_at_thread_exit(current_exception());
p.set_exception(current_exception());
}
}
/*---------------------------Main-------------------------------*/
int main()
{
try
{
promise<string> p;
thread t(doSomething,ref(p));
t.detach();
// sol - 1
std::future<string> f(p.get_future());
string r = f.get();//block the current thread until thread with f ends.
// sol - 2
//block the current thread until thread ends.
//string r = p.get_future().get();
cout << " - result:" << r << endl;
}
catch(const exception& e)
{
cerr << " - Exception:" << e.what() << endl;
}
catch(...)
{
cerr << " - Exceptions" << endl;
}
return 0;
}
packaged_task<>
packaged_task<T>定义于头文件<future>中,模板参数T是callable类型,如double(int,int)。packaged_task<T>的构造函数的参数也应该传递一个T类型的callable对象。
packaged_task<T>定义的线程对象task,不是立刻执行,而是通过执行task(...)来开始执行。在调用task()之前,可以先获取对应的future,f = task.get_future()。通过f.get()等待task执行完毕返回结果。
#include <future>
using namespace std;
double doSomething(int x,int y)
{
double r;
//...
return r;
}
int main(int argc,char** argv)
{
packaged_task<double(int,int)> task(doSomething);//不用传递参数,执行时才传递;
future<double> f = task.get_future();
int num1,num2;
//...
task(num1,num2); //异步执行,通常(非一定)启动于一分离线程;
//...
double result = f.get();//等待线程结束,或者抛出异常;
//...
return 0;
}
线程同步
Resource Acquisition Is Initialization.
"RAII":构造函数获取资源,析构函数释放资源。
mutex和lock
mutex
凡是可能发生并发访问的地方,都应该使用同一个mutex。同时,要确保异常也能释放所占用的mutex。
不要直接lock一个mutex,即不要直接执行"mutex_obj.lock()",应该通过STL提供的std::lock_guard来获取mutex,这样获取的mutex会阻塞其他试图获取mutex的代码,也会自动释放mutex。要注意的是,尽量将这样的lock限制在最短的时间内。以下是一个简单示例:
/*
mutex demo:同步输出
*/
#include <mutex>
#include <future>
#include <iostream>
#include <string>
using namespace std;
mutex printMutex;
/**
* lock_guard是RAII
**/
void print(const string& s)
{
//通过lock_guard来获取mutex;
lock_guard<std::mutex> lg(printMutex);
for(char c:s)
{
cout.put(c);
}
cout << endl;
}
int main(int argc,char** argv)
{
auto f1 = async(print," - thread-1: hello world.");
auto f2 = async(print," - thread-2: hello world.");
print(" - thread-0: hello world!");
return 0;
}
C++ STL提供了不同的mutex和lock。
recursive mutex
std::recursive_mutex允许在同一个线程中多次lock该recursive_mutex并释放(unlock)。对应可以使用lock_guard<std::recursive_mutex> lg(re_mx)。最后一次unlock()时,释放lock。
Tried and Timed lock
如果程序需要获取一个mutex,但是如果不成功的话就执行其他操作,而不是阻塞,那么就可以使用std::mutex::try_lock()。try_lock会试图获取一个lock,如果成功返回true,否则返回false。如果返回true的话,那么该mutex就被占用了,需要执行unlock()。
try_lock之后,为了仍然能够使用lock_guard,以便当前作用域会自动释放mutex,在创建lock_guard的时候,可以传递额外参数adopt_lock给其构造函数。
try_lock()可能会假性失败。
// - example 1
std::mutex m;
if(m.try_lock() == true)
{
//...
m.unlock();
}
else
{
// do something else
}
// - example 2
std::mutex m;
if(m.try_lock() == false)
{
// do something else
}
std::lock_guard<std::mutex> lg(m,std::adopt_lock);
timed_mutex
为了能够等待特定时长,可选用std::timed_mutex或者std::recursive_timed_mutex。此二者提供了两个成员函数
try_lock_for()和try_lock_until(),通过这两个函数可以等待一段时间或者到达某个时间点,其功能类似。STL文档中,try_lock_for的功能介绍如下:
Tries to lock the mutex. Blocks until specified timeout_duration has elapsed or the lock is acquired, whichever comes first. On successful lock acquisition returns true, otherwise returns false.
这两个函数如果成功获取mutex,则返回true;如果等待时间超时还未获取mutex,则返回false。
处理多个mutex
有时,需要同时锁定多个mutex。通过全局函数std::lock(...)可以锁定多个mutex。lock会阻塞,直到锁定所有的mutex,或者抛出异常(同时释放所有已经获得的mutex)。与try_lock和timed_mutex一样,获取到mutex后,需要lock_guard处理每一个锁定的mutex,同时要传递第二个参数std::adopt_lock。
std::mutex m1;
std::mutex m2;
//...
{
std::lock(m1,m2);
std::lock_guard<std::mutex> lockM1(m1,std::adopt_lock);
std::lock_guard<std::mutex> lockM2(m2,std::adopt_lock);
//...
}//自动unlock所有的mutex;
还可以使用std::try_lock(...)尝试锁定多个mutex。如果获得了所有的mutex,则返回-1;否则返回第一个失败的mutex的索引(从0开始计数),同时释放所有已经获得的mutex。
unique_lock
C++ STL提供了比lock_guard更为灵活的处理mutex的类,unique_lock类。该类在析构的时候,如果锁定了某个mutex,则会自动释放该mutex,否则不做任何相关操作。
unique_lock可以通过调用owns_lock()或者bool operator()来判断是否锁定mutex。
unique_lock的构造函数如果不指定第二个参数,如果没有获取到mutex则会阻塞,直到获取到mutex。
//一个简单示例
#include <mutex>
#include <iostream>
#include <chrono>
#include <future>
std::mutex m;
void thread1()
{
while(true)
{
std::lock_guard<std::mutex> lg(m);
std::cout << '+';
std::this_thread::yield();
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
void thread2()
{
while(true)
{
std::unique_lock<std::mutex> ulock(m);
std::cout << '-';
std::this_thread::yield();
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
int main(int argc,char** argv)
{
std::future<void> t1(std::async(thread1));
std::future<void> t2(std::async(thread2));
t1.get();
t2.get();
return 0;
}
unique_lock构造函数可以对mutex先不锁定lock,也可以尝试锁定,都是通过传递第二个参数给构造函数实现。
//使用unique_lock
// -1-
//nonblocking attempt to lock a mutex;
std::unique_lock<std::mutex> lock(mutex,std::try_to_lock);
if(lock){
...
}
// -2-
//timed_mutex
std::unique_lock<std::timed_mutex> lock(mutex,std::chrono::seconds(2));
// -3-
//defer_lock
std::unique_lock<std::mutex> lock(mutex,std::defer_lock);
//...
lock.lock();// or (timed) try_lock();
//...
unique_lock可以通过move操作将mutex的所有权转移给另外的unique_lock,此时原来的unique_lock不在拥有该mutex。不能复制mutex的所有权。unique_lock可以通过release()函数来释放拥有的mutex。
#include <mutex>
#include <iostream>
int main(int argc,char** argv)
{
std::mutex m;
std::unique_lock<std::mutex> ul(m);
if(ul)
{
// go to here
std::cout << "ul owns the mutex" << std::endl;
}
else
{
std::cout << "ul does not own the mutex." << std::endl;
}
// error: copy constructor is not supported.
//std::unique_lock<std::mutex> ulock2(ul);
// ok
std::unique_lock<std::mutex> ulock2(std::move(ul));
if(ul)
{
std::cout << "ul owns the mutex" << std::endl;
}
else
{
// go to here
std::cout << "ul does not own the mutex." << std::endl;
}
if(ulock2)
{
// go to here
std::cout << "ulock2 owns the mutex" << std::endl;
}
else
{
std::cout<< "ulock2 does not own the mutex" << std::endl;
}
return 0;
}
once_flag和call_once
<mutex>提供了once_flag和call_once用来表明只执行一次的代码,常用于初始化资源。
将once_flag对象作为第一个参数传递给call_once。call_once的第二个参数为要执行的callable对象foo,后面可以跟函数foo的参数。
#inclue <mutex>
std::once_flag is_initialized;
//...
//调用初始化函数func_initialization一次;
std::call_once(is_initialized,func_initialization,...);
如果call_once执行时抛出异常,则视为不成功,那么还能再执行。
条件变量
future主要用来处理线程的返回值或者异常。通过条件变量,可以处理线程之间的数据流逻辑依赖关系。
头文件:<condition_variable>
条件变量一般和mutex同时使用。
#include <mutex>
#include <condition_variable>
//定义条件变量和mutex
std::mutex readyMutex;
std::condition_variable readyCondVar;
//提供资源的线程t1:准备好之后,提示等待资源的线程;
//通知操作不用放在保护区内;
readyCondVar.notify_one();
//或者
readyCondVar.notify_all();
// 等待资源的线程t2:
std::unique_lock<std::mutex> ulock(readyMutex);
readyCondVar.wait(ulock);
等待资源的线程即使被唤醒,不一定意味着条件已经满足,还需要再进行验证,一般可在条件变量的wait成员函数中指定第二个参数来进行验证。第二个参数时一个callable对象,返回bool值。
readyCondVar.wait(ulock,[]{return readyFlag;});
通知操作不用放在保护区内;
此外,条件变量还提供了wait_until和wait_for,用来等待直到某个时间点。
Atomic
Atomic operations are guaranteed to happen in the order as programmed.
一个简单示例(atomic高层接口)
#include <atomic> // 1. 头文件
//...
/*
2. 定义一个atomic模板对象
模板参数可以为bool、int、指针等各种常规类型;
同时要用一个常量对atomic对象进行初始化;
如果定义时用的默认构造函数,即没有用常量进行初始化,那么定义语句后面的唯一允许对该atomic对象的操作是初始化,即下面的代码“std::atomic_init(&readyFlag,false);”;
*/
std::atomic<bool> readyFlag(false); //定义一个atomic模板对象,一定要进行初始化
void thread1()
{
//...
readyFlag.store(true); // 3. 给atomic对象赋新值;
}
void thread2()
{
//...
while(!readyFlag.load()) // 4. 获取atomic对象当前值;
{
std::this_thread::sleep_for(std::chrono::miliseconds(100));
}
//...
}
上述代码对atomic对象的操作都是原子性的,不需要mutex进行保护。