受欢迎的博客标签

C++17:标准库启动线程的方法可分为三类:async、thread以及packaged_task

Published

C++标准库-17-并发

命名空间namespace std::this_thread提供了几个线程相关的专属操作:get_id、sleep_for、sleep_until以及yield。

C++标准库启动线程的方法可分为三类:async、thread以及packaged_task。其中,async一般和futrue或者shared_future搭配,thread一般和promise搭配(也要结合future使用),packaged_task一般和future搭配。

通过静态函数unsigned int std::thread::hardware_concurrency(),可以查询并行线程的可能数量(仅供参考)。

async和future

一般概念

利用async和future可以很方便地进行并发执行。

async会尝试在新的线程中异步执行对应的函数foo,但是不保证一定会执行。应将async返回值赋值给一个future对象fobj,通过调用该future对象的get成员函数,确保该函数foo的执行。

调用future对象的get成员函数,可能有三种情况发生:

  1. 如果foo函数被启动于一个新线程,并且已经执行结束,那么get立刻返回执行结果;

  2. 如果foo函数被启动于一个新的线程,但是尚未结束,那么get会阻塞等待foo执行结束后获得结果并返回;

  3. 如果foo尚未启动,会被强迫启动,类似于一个同步调用;get会阻塞知道foo执行结束。

注意事项:

  • 尽量推迟get成员函数的执行,即需要线程执行结果时再调用get;

  • 传递给async的可以是任何类型的callable对象;

  • 一个future<>对象,只能调用get一次,然后future对象就处于无效状态,通过future.valid()判断future对象是否有效;

  • async除了可以传递callable对象,还可以给该callable对象传递参数,参数位于callable对象之后;也可以传递成员函数,成员函数的后面应紧跟对应类的对象的引用或者指针;

示例如下:

 #include <iostream>
 #include <thread>
 #include <future>
 #include <chrono>
 #include <random>
 ​
 using namespace std;
 ​
 int do_something(char c)
 {
     default_random_engine dre(c);
     uniform_int_distribution<int> di(10,1000);
     
     for(int i=0;i<10;++i)
     {
         //线程挂起随机时间,毫秒;
         this_thread::sleep_for(chrono::milliseconds(di(dre)));
         cout.put(c).flush();
     }
     
     return c;
 }
 ​
 int func1()
 {
     return do_something('.');
 }
 ​
 int func2()
 {
     return do_something('+');
 }
 ​
 int main(int argc,char** argv)
 {
     cout << " starting func1() in background and func2() in foreground..." << endl;
     future<int> result1(async(func1)); //后台运行线程func1,返回结果存在result1中;
                                        //func1不保证一定立刻运行,也可能不运行;
     int result2 = func2(); //主线程中运行func2
 ​
     /***************************************************************************************
     +result1.get()保证了对应的线程运行,即确保了func1运行:
     +   (1)如果func1线程已经运行结束,get()直接返回结果;
     +   (2)如果func1线程正在运行,get()会阻塞主线程,等待func1线程运行结束再返回结果;
     +   (3)如果func1线程还未运行,则运行对应函数,等待结束后返回结果,这种情况一般出现在单线程系统或者禁用多线程的系统中。   ****************************************************************************************/
     int result = result2+result1.get();
 ​
     cout << "\nresult of func1()+func2() : " << result << endl;
 ​
     return 0;
 }

launch策略

async还可以指定launch策略,作为第一个参数传递给async,可选项有:

  • std::launch::async: enable asynchronous evaluation

  • std::launch::deferred: enable lazy evaluation

默认policy为“std::launch::async | std::launch::deferred”。

如果明确指定为std::launch::async,那么目标函数要么在新线程中异步执行,要么抛出异常std::system_error。

如果指定为std::launch::deferred,那么就会将目标函数的执行推迟到调用fobj.get时。

处理异常

future对象调用get()能够处理异常:线程中的异常会被保持并传递到get函数调用处,只需对get函数处理异常即可。

等待和轮询

一个future<>对象,只能调用get一次,然后future对象就处于无效状态。如果对future对象f调用wait函数,那么可以强制启动该future对象的线程,并等待这一个后台操作终止:

 std::future<T> f(std::async(func));
 //...
 f.wait();// wait for func to be done.
  • wait还有两个变种:wait_forwait_until

shared_future

通过使用shared_future对象,可以多次调用get,要么返回相同结果,要么都抛出异常.适用于在多个线程中查询后台线程执行情况。

可以直接将async赋值给shared_future<T>,也可以调用future的成员函数share:

 shared_future<int> f = async(querryNumber);
 ​
 auto f = async(querryNumber).share();//此后,async(querryNumber)对应的future对象状态为无效

一个简答示例如下:

 #include <future>
 #include <iostream>
 #include <cstdlib>
 ​
 using namespace std;
 ​
 int querryNumber()
 {
     cout << "Read Number:";
     int num;
     cin >> num;
 ​
     if(!cin)
     {
         throw runtime_error("No number read.");
     }
 ​
     return num;
 }
 ​
 void doSomething(char c,shared_future<int> f)
 {
     try
     {
         int num = f.get(); // 每个线程都调用get一次;
         for(int i=0;i<num;++i)
         {
             this_thread::sleep_for(chrono::milliseconds(500));
             cout.put(c).flush();
         }
     }
     catch(const std::exception& e)
     {
         cerr << "Exception in Thread_" << this_thread::get_id() << ": "
             << e.what() << endl;
     }
     
     
 }
 ​
 int main(int argc,char** argv)
 {
     try
     {
         shared_future<int> f = async(querryNumber);
         auto f1 = async(launch::async,doSomething,'+',f);
         auto f2 = async(launch::async,doSomething,'-',f);
         auto f3 = async(launch::async,doSomething,'*',f);
 ​
         f1.get();
         f2.get();
         f3.get();
     }
     catch(const std::exception& e)
     {
         std::cerr << e.what() << '\n';
     }
     cout << "\ndone" << endl;
 ​
     return EXIT_SUCCESS;
 }

Thread和Promise

thread

std::thread为底层接口,定义于头文件<thread>中。利用std::thread可以启动线程。给std::thread传入一个callable对象,可以启动线程来完成相应的任务。

 #include <thread>
 ​
 void doSomething();
 ​
 std::thread t(doSomething);
 //...
 t.join();//等待t线程结束;

一个std::thread启动后,要么等待其结束(join),要么将其卸离(detach),即运行于后台不受任何控制。

detached thread应该尽量只访问本地资源、对象;

std::thread线程内部如果发生了异常,且未被捕获,那么程序会立刻调用std::terminate()终止运行。

如果std::thread对象t被detached,当t声明周期结束,如main退出,那么对应的线程t也会被强制结束。

给线程传递参数时,应该尽量使用按值传递,避免按引用传参。

 

Promise

用来传递运行结果和异常(特殊的运行结果)的一般性机制:std::promise。通常定义线程任务函数时,接收一个std::promise<>引用作为参数,以便传递运行结果和异常。

std::promise在线程任务函数中设置执行结果或者异常。

在线程外,通常是主线程中,std::promise经常和future配合起来使用,std::promise对象的get_future()成员函数可以返回也给future对象,通过该future对象调用get()成员函数,可以等待线程执行结束或者抛出线程中的异常。

下面是一个简单示例:

 #include <iostream>
 #include <thread>
 #include <future>
 #include <chrono>
 #include <random>
 #include <exception>
 ​
 using namespace std;
 ​
 // thread function
 void doSomething(promise<string>& p)
 {
     try
     {
         cout << "read char ('x' for exception): ";
         char c = cin.get();
         if(c=='x')
         {
             throw runtime_error(string("char ")+c+" read");
         }
 ​
         cout << " - GET " << c << endl;
         string s = string("char ") + c + " processed.";
         p.set_value(std::move(s));
         //线程结束后才设置shared state 为ready
         //p.set_value_at_thread_exit(std::move(s));
         cout << " - after set_value ..." << endl;
     }
     catch(...)
     {
         //线程结束后才设置shared state 为ready
         //p.set_exception_at_thread_exit(current_exception());
         p.set_exception(current_exception());
     }    
 }
 ​
 /*---------------------------Main-------------------------------*/
 int main()
 {
     try
     {
         promise<string> p;
         thread t(doSomething,ref(p));
        
         t.detach();
 ​
         // sol - 1
         std::future<string> f(p.get_future());
         string r = f.get();//block the current thread until thread with f ends.
 ​
         // sol - 2
         //block the current thread until thread ends.
         //string r = p.get_future().get();
 ​
         cout << " - result:" << r << endl;
     }
     catch(const exception& e)
     {
         cerr << " - Exception:" << e.what() << endl;
     }
     catch(...)
     {
         cerr << " - Exceptions" << endl;
     }
     return 0;
 }

packaged_task<>

packaged_task<T>定义于头文件<future>中,模板参数T是callable类型,如double(int,int)。packaged_task<T>的构造函数的参数也应该传递一个T类型的callable对象。

packaged_task<T>定义的线程对象task,不是立刻执行,而是通过执行task(...)来开始执行。在调用task()之前,可以先获取对应的future,f = task.get_future()。通过f.get()等待task执行完毕返回结果。

 #include <future>
 using namespace std;
 ​
 double doSomething(int x,int y)
 {
     double r;
     //...
     
     return r;
 }
 ​
 int main(int argc,char** argv)
 {
     packaged_task<double(int,int)> task(doSomething);//不用传递参数,执行时才传递;
     future<double> f = task.get_future();
     
     int num1,num2;
     //...
     task(num1,num2); //异步执行,通常(非一定)启动于一分离线程;
     
     //...
     double result = f.get();//等待线程结束,或者抛出异常;
     
     //...
     return 0;
 }

 

线程同步

Resource Acquisition Is Initialization.

"RAII":构造函数获取资源,析构函数释放资源。

mutex和lock

mutex

凡是可能发生并发访问的地方,都应该使用同一个mutex。同时,要确保异常也能释放所占用的mutex。

不要直接lock一个mutex,即不要直接执行"mutex_obj.lock()",应该通过STL提供的std::lock_guard来获取mutex,这样获取的mutex会阻塞其他试图获取mutex的代码,也会自动释放mutex。要注意的是,尽量将这样的lock限制在最短的时间内。以下是一个简单示例:

 /*
 mutex demo:同步输出
 */
 ​
 #include <mutex>
 #include <future>
 #include <iostream>
 #include <string>
 ​
 using namespace std;
 ​
 mutex printMutex;
 ​
 /**
  * lock_guard是RAII
  **/
 void print(const string& s)
 {
     //通过lock_guard来获取mutex;
     lock_guard<std::mutex> lg(printMutex);
     for(char c:s)
     {
         cout.put(c);
     }
     cout << endl;
 }
 ​
 int main(int argc,char** argv)
 {
     auto f1 = async(print," - thread-1: hello world.");
     auto f2 = async(print," - thread-2: hello world.");
 ​
     print(" - thread-0: hello world!");
 ​
     return 0;
 }

C++ STL提供了不同的mutex和lock。

recursive mutex

std::recursive_mutex允许在同一个线程中多次lock该recursive_mutex并释放(unlock)。对应可以使用lock_guard<std::recursive_mutex> lg(re_mx)。最后一次unlock()时,释放lock。

Tried and Timed lock

如果程序需要获取一个mutex,但是如果不成功的话就执行其他操作,而不是阻塞,那么就可以使用std::mutex::try_lock()。try_lock会试图获取一个lock,如果成功返回true,否则返回false。如果返回true的话,那么该mutex就被占用了,需要执行unlock()。

try_lock之后,为了仍然能够使用lock_guard,以便当前作用域会自动释放mutex,在创建lock_guard的时候,可以传递额外参数adopt_lock给其构造函数。

  • try_lock()可能会假性失败。

// - example 1
std::mutex m;
if(m.try_lock() == true)
{
	//...
	m.unlock();
}
else
{
	// do something else
}

// - example 2
std::mutex m;
if(m.try_lock() == false)
{
    // do something else
}

std::lock_guard<std::mutex> lg(m,std::adopt_lock);

timed_mutex

为了能够等待特定时长,可选用std::timed_mutex或者std::recursive_timed_mutex。此二者提供了两个成员函数

try_lock_for()和try_lock_until(),通过这两个函数可以等待一段时间或者到达某个时间点,其功能类似。STL文档中,try_lock_for的功能介绍如下:

Tries to lock the mutex. Blocks until specified timeout_duration has elapsed or the lock is acquired, whichever comes first. On successful lock acquisition returns true, otherwise returns false.

这两个函数如果成功获取mutex,则返回true;如果等待时间超时还未获取mutex,则返回false。

处理多个mutex

有时,需要同时锁定多个mutex。通过全局函数std::lock(...)可以锁定多个mutex。lock会阻塞,直到锁定所有的mutex,或者抛出异常(同时释放所有已经获得的mutex)。与try_lock和timed_mutex一样,获取到mutex后,需要lock_guard处理每一个锁定的mutex,同时要传递第二个参数std::adopt_lock。

std::mutex m1;
std::mutex m2;
//...
{
	std::lock(m1,m2);
	std::lock_guard<std::mutex> lockM1(m1,std::adopt_lock);
	std::lock_guard<std::mutex> lockM2(m2,std::adopt_lock);
	//...
}//自动unlock所有的mutex;

还可以使用std::try_lock(...)尝试锁定多个mutex。如果获得了所有的mutex,则返回-1;否则返回第一个失败的mutex的索引(从0开始计数),同时释放所有已经获得的mutex。

unique_lock

C++ STL提供了比lock_guard更为灵活的处理mutex的类,unique_lock类。该类在析构的时候,如果锁定了某个mutex,则会自动释放该mutex,否则不做任何相关操作。

unique_lock可以通过调用owns_lock()或者bool operator()来判断是否锁定mutex。

unique_lock的构造函数如果不指定第二个参数,如果没有获取到mutex则会阻塞,直到获取到mutex。

//一个简单示例
#include <mutex>
#include <iostream>
#include <chrono>
#include <future>


std::mutex m;
void thread1()
{
    while(true)
    {
        std::lock_guard<std::mutex> lg(m);
        std::cout << '+';
        std::this_thread::yield();
        std::this_thread::sleep_for(std::chrono::seconds(1));

    }
}

void thread2()
{
    while(true)
    {
        std::unique_lock<std::mutex> ulock(m);
        std::cout << '-';
        std::this_thread::yield();
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
}

int main(int argc,char** argv)
{
    std::future<void> t1(std::async(thread1));
    std::future<void> t2(std::async(thread2));
    
    t1.get();
    t2.get();

    return 0;
}

unique_lock构造函数可以对mutex先不锁定lock,也可以尝试锁定,都是通过传递第二个参数给构造函数实现。

//使用unique_lock
// -1-
//nonblocking attempt to lock a mutex;
std::unique_lock<std::mutex> lock(mutex,std::try_to_lock);
if(lock){
...
}

// -2-
//timed_mutex
std::unique_lock<std::timed_mutex> lock(mutex,std::chrono::seconds(2));

// -3-
//defer_lock
std::unique_lock<std::mutex> lock(mutex,std::defer_lock);
//...
lock.lock();// or (timed) try_lock();
//...

unique_lock可以通过move操作将mutex的所有权转移给另外的unique_lock,此时原来的unique_lock不在拥有该mutex。不能复制mutex的所有权。unique_lock可以通过release()函数来释放拥有的mutex。

#include <mutex>
#include <iostream>

int main(int argc,char** argv)
{
    std::mutex m;
    std::unique_lock<std::mutex> ul(m);

    if(ul)
    {
        // go to here
        std::cout << "ul owns the mutex" << std::endl;
    }
    else
    {
        std::cout << "ul does not own the mutex." << std::endl;
    }
    
    // error: copy constructor is not supported.
    //std::unique_lock<std::mutex> ulock2(ul);

    // ok
    std::unique_lock<std::mutex> ulock2(std::move(ul));

    if(ul)
    {
        std::cout << "ul owns the mutex" << std::endl;
    }
    else
    {
        // go to here
        std::cout << "ul does not own the mutex." << std::endl;
    }

    if(ulock2)
    {
        // go to here
        std::cout << "ulock2 owns the mutex" << std::endl;
    }
    else
    {
        std::cout<< "ulock2 does not own the mutex" << std::endl;
    }
    

    return 0;
}

once_flag和call_once

<mutex>提供了once_flag和call_once用来表明只执行一次的代码,常用于初始化资源。

将once_flag对象作为第一个参数传递给call_once。call_once的第二个参数为要执行的callable对象foo,后面可以跟函数foo的参数。

#inclue <mutex>

std::once_flag is_initialized;

//...

//调用初始化函数func_initialization一次;
std::call_once(is_initialized,func_initialization,...);

如果call_once执行时抛出异常,则视为不成功,那么还能再执行。

条件变量

future主要用来处理线程的返回值或者异常。通过条件变量,可以处理线程之间的数据流逻辑依赖关系。

头文件:<condition_variable>

条件变量一般和mutex同时使用。

#include <mutex>
#include <condition_variable>

//定义条件变量和mutex
std::mutex readyMutex;
std::condition_variable readyCondVar;

//提供资源的线程t1:准备好之后,提示等待资源的线程;
//通知操作不用放在保护区内;
readyCondVar.notify_one();
//或者
readyCondVar.notify_all();

// 等待资源的线程t2:
std::unique_lock<std::mutex> ulock(readyMutex);
readyCondVar.wait(ulock);

等待资源的线程即使被唤醒,不一定意味着条件已经满足,还需要再进行验证,一般可在条件变量的wait成员函数中指定第二个参数来进行验证。第二个参数时一个callable对象,返回bool值。

readyCondVar.wait(ulock,[]{return readyFlag;});

通知操作不用放在保护区内;

此外,条件变量还提供了wait_until和wait_for,用来等待直到某个时间点。

 

Atomic

Atomic operations are guaranteed to happen in the order as programmed.

一个简单示例(atomic高层接口)

#include <atomic>  // 1. 头文件
//...

/*
2. 定义一个atomic模板对象
	模板参数可以为bool、int、指针等各种常规类型;
	同时要用一个常量对atomic对象进行初始化;
	如果定义时用的默认构造函数,即没有用常量进行初始化,那么定义语句后面的唯一允许对该atomic对象的操作是初始化,即下面的代码“std::atomic_init(&readyFlag,false);”;
*/
std::atomic<bool> readyFlag(false); //定义一个atomic模板对象,一定要进行初始化

void thread1()
{
    //...
    readyFlag.store(true); // 3. 给atomic对象赋新值;
}

void thread2()
{
    //...
    while(!readyFlag.load()) // 4. 获取atomic对象当前值;
    {
        std::this_thread::sleep_for(std::chrono::miliseconds(100));
    }
    
    //...
}

上述代码对atomic对象的操作都是原子性的,不需要mutex进行保护。