概述

C++通过<future>头文件提供了一组支持异步编程的工具,使用这些工具比直接进行多线程操作更加高级、更加简便。

主要包括如下的类型:

  • std::future:表示异步操作的结果,这个结果在未来可能可用,支持查询操作的状态,等待操作完成和获取结果。注意用于获取结果的get()方法调用会阻塞当前执行流,直到结果准备就绪。
  • std::promise:承诺在未来提供一个可用的值,通常与 std::future 配对使用,set_result()可以设置异步操作的结果。可用get_future()提取获得一个关联的std::future对象。
  • std::packaged_task:封装一个函数或可调用对象,使其可以作为异步任务执行。可用get_future()获得一个关联的std::future对象。

还包括如下的函数:

  • std::async:用于启动异步任务,返回一个std::future对象代表任务的结果,注意我们必须要用变量接收这个返回值,否则当前语句会阻塞式的等待任务结束,因为只有异步任务结束才会销毁返回的临时变量!

这里std::futurestd::promise是成对使用的,std::packaged_task类型和std::async函数则是对异步编程的进一步封装和简化,可以避免显式处理std::promise对象。

实例

我们直接用几个例子来解释异步编程的基本用法,从最简单的例子开始

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <cmath>
#include <future>
#include <iostream>

int main() {
std::promise<double> prom;
std::future<double> fut = prom.get_future();

// 在另一个线程中设置结果
std::thread t([&prom](double x) { prom.set_value(sqrt(x)); }, 2.0);

// 等待并展示结果
std::cout << "Result: " << fut.get() << '\n';

t.join();
return 0;
}

代码解释如下:

  • std::promise代表一个承诺:
    • 可以以引用传递方式传递到子线程中,并通过set_value()方法设置值
    • 可以使用get_future()方法获取std::future对象
  • std::future代表一个异步编程的结果:可以通过get()方法获取结果

第二个例子是std::packaged_task的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <cmath>
#include <future>
#include <iostream>

int main() {
std::packaged_task<double(double)> task([](double x) { return sqrt(x); });
std::future<double> result = task.get_future();

// 将task移动到另一个线程中执行(加上需要传入的参数)
std::thread th(std::move(task), 2.0);

// 等待并展示结果
std::cout << "Result: " << result.get() << '\n';

th.join();
return 0;
}

std::packaged_task的使用比第一个例子更加简单,省略了std::promise的定义和使用, 自动包装一个可调用对象,并把执行结果传递给std::future对象。

第三个例子是std::async函数的使用,仍然需要提供一个可调用对象(以及需要传入的参数)

1
2
3
4
5
6
7
8
9
10
11
12
13
#include <cmath>
#include <future>
#include <iostream>

int main() {
std::future<double> result =
std::async(std::launch::async, [](double x) { return sqrt(x); }, 2.0);

// 等待并展示结果
std::cout << "Result: " << result.get() << '\n';

return 0;
}

std::async的使用更加高级和简洁,完全不需要手动创建和管理线程,返回的std::future对象可以获取可调用对象的结果。

std::future

std::future对象通常不会直接创建,而是通过如下几种方式获得:

  • std::promise对象的get_future()方法
  • std::packaged_task对象的get_future()方法
  • std::async函数的返回值

这些方式获得的对象自动与对应的异步操作相关联。

std::future对象支持如下方法:

  • get():获取对应异步操作的结果。如果结果尚未准备好,此调用将阻塞,直到结果可用。(暂不讨论异步操作中的异常问题)
  • wait():阻塞当前线程,进入无限等待状态,直到对应的异步操作完成,无返回值。
  • wait_for:等待指定的时间段,在这段时间内异步操作完成或超时都将结束等待。
  • wait_until:等待直到指定的时间点,在时间点之间异步操作完成或超时都将结束等待。
  • valid():检查std::future对象是否有效,即是否关联了一个异步操作,返回布尔值。

其中get()wait()都会阻塞当前线程直到任务完成,但是wait()可以多次调用,而get()只允许调用一次。

可以使用wait_for(0)实现非阻塞式的检查

1
2
3
4
5
6
if(fut.wait_for(std::chrono::seconds(0)) == std::future_status::ready) {
// 操作已完成
}
else {
// 操作尚未完成
}

std::promise

std::promise通常和std::future成对出现:(使用相同的类型模板参数)

  • std::promise用于在某一线程中通过set_future()设置某个值
  • std::future则用于在另一线程中通过get()获取这个值。

通常先创建std::promise对象,然后使用get_future()创建与之关联的std::future对象。

由于std::promise对象不支持,我们必须通过移动或者引用传递的方式提供给子线程。 对于更复杂的情况,则需要使用共享的std::shared_future类型,它相比于std::future有更弱的所有权, 允许多个线程都通过get()获取结果。(std::future只能调用一次get()

使用std::shared_future的示例如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <future>
#include <iostream>
#include <thread>

int main() {
std::promise<int> prom;
std::thread t([&]() {
std::this_thread::sleep_for(std::chrono::seconds(2));
prom.set_value(42);
});
t.detach();

std::shared_future<int> sharedFuture = prom.get_future().share();

std::cout << "Starting tasks...\n";

// 在两个不同的任务中使用shared_future
std::thread task1([&]() { std::cout << sharedFuture.get() << '\n'; });
std::thread task2([&]() { std::cout << sharedFuture.get() << '\n'; });

task1.join();
task2.join();

std::cout << "Tasks completed.\n";
return 0;
}

虽然多个线程都可以用get()获取结果,但是显然在结果尚未就绪时,对应的线程仍然需要陷入阻塞式的等待中。

std::packaged_task

std::packaged_task只是对可调用对象的一次封装,省略了std::promise的角色, 并且显然和std::promise一样不支持拷贝,只能使用移动的方式传递给子线程,其它没什么好说的。

不同编译器对于std::packaged_task的实现还不一样,例如gcc允许对其进行移动,但是MSVC似乎不允许。

std::async

std::async的调用方式有两种:

  • 第一种方式需要依次传入启动策略、可调用对象、可调用对象需要的参数;
  • 第一种方式只需要传入可调用对象、可调用对象需要的参数,使用默认的启动策略。

两种用法示例如下

1
2
3
4
5
std::future<double> result1 =
std::async(std::launch::async, [](double x) { return sqrt(x); }, 2.0);

std::future<double> result2 =
std::async([](double x) { return sqrt(x); }, 2.0);

std::async接受的启动策略通过std::launch枚举类提供:

  • std::launch::async:表示任务将立刻在另一个新线程中异步执行
  • std::launch::deferred:表示任务会被延迟执行,直到需要提供结果时才会在当前线程中同步执行,例如用户调用std::future::get()std::future::wait()函数时。
  • std::launch::async | std::launch::deferred:这是上面两个策略的组合,任务既可以在一个单独的线程上异步执行,也可以选择延迟执行,取决于具体实现,不同的编译器和操作系统可能会有不同的默认行为。

在启动策略缺省时,std::async会使用std::launch::async | std::launch::deferred策略。

需要强调的是,我们必须使用std::future对象来接收std::async函数的返回值,否则产生的临时对象会直到异步操作完成才会析构,这会对主线程产生阻塞。

C++ 实现简易线程池

下面提供一个线程池的简易实现(参考 C++ 并发三剑客future, promise和async

simple_thread_pool.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#pragma once

#include <atomic>
#include <condition_variable>
#include <future>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>

class SimpleThreadPool {
public:
// 构造时自动开启线程池
explicit SimpleThreadPool(uint32_t thread_num)
: m_thread_num(thread_num > 0 ? thread_num : 1) {
start();
}

// 析构时自动关闭线程池
~SimpleThreadPool() { stop(); }

// 禁止复制
SimpleThreadPool(const SimpleThreadPool &) = delete;
SimpleThreadPool &operator=(const SimpleThreadPool &) = delete;

// 提交任务,含参数,返回future
template <class F, class... Args>
auto commit(F &&f, Args &&...args) {
// 返回类型
using RetType = std::invoke_result_t<F, Args...>;

// 如果线程池已经停止,直接返回空的future
if (!m_running.load())
throw std::runtime_error("ThreadPool is stopped.");

// 提交任务
// 对参数进行完美转发,使用lambda表达式打包可调用对象和参数
// 创建packaged_task对象进一步包装
auto new_task_ptr = std::make_shared<std::packaged_task<RetType()>>(
[func = std::forward<F>(f),
... args = std::forward<Args>(args)]() mutable {
return func(std::forward<Args>(args)...);
});

std::future<RetType> result = new_task_ptr->get_future();
{
std::lock_guard<std::mutex> mtx_guard(m_mtx);
m_tasks.emplace([new_task_ptr] { (*new_task_ptr)(); });
}

m_cv.notify_one(); // 唤醒一个线程来执行任务

return result; // 返回future对象
}

// 获取当前可用的线程数量
uint32_t get_idle_thread_num() const { return m_idle_thread_num; }

// 获取线程池实例的线程数量
uint32_t get_thread_num() const { return m_thread_num; }

private:
// 开启线程池
void start() {
std::unique_lock<std::mutex> mtx_guard(m_mtx);

m_running.store(true);
m_idle_thread_num.store(m_thread_num);

for (uint32_t i = 0; i < m_thread_num; ++i) {
// 向线程池中填充默认任务
m_pool.emplace_back([this]() { // 必须显式捕获this指针
while (this->m_running.load()) { // 线程池开启时无法跳出循环
std::packaged_task<void()> task;

{
// 获取互斥锁
std::unique_lock<std::mutex> mtx_guard2(m_mtx);

// 通过条件变量让线程陷入等待
// 只有当前的任务队列非空或线程池已经被关闭时才会被成功唤醒
this->m_cv.wait(mtx_guard2, [this] {
return !this->m_running.load()
|| !this->m_tasks.empty();
});

// 如果任务队列为空,代表没有有效任务,直接return
// 此时通常意味着线程池被关闭,可以跳出while循环
if (this->m_tasks.empty()) { return; }

// 以move方式获取队列中的首位任务并执行
task = std::move(this->m_tasks.front());
this->m_tasks.pop();
}

this->m_idle_thread_num--; // 可用线程数-1
task();
this->m_idle_thread_num++; // 可用线程数+1
}
});
}
}

// 关闭线程池
void stop() {
m_running.store(false); // 设置为停止状态
m_cv.notify_all(); // 唤醒所有线程

// 合并所有线程
for (auto &td : m_pool) {
if (td.joinable()) { td.join(); }
}
}

std::mutex m_mtx; // 互斥锁
std::condition_variable m_cv; // 条件变量
std::atomic_bool m_running; // 线程池是否正在运行
std::atomic_uint32_t m_thread_num; // 线程池大小
std::atomic_uint32_t m_idle_thread_num; // 可用的空闲线程数

std::queue<std::packaged_task<void()>> m_tasks; // 任务队列
std::vector<std::thread> m_pool; // 线程池
};

测试代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#include "simple_thread_pool.hpp"

#include <iostream>
#include <stdexcept>
#include <string>
#include <thread>
#include <vector>

int main() {
try {
// 获取线程池实例
auto pool = SimpleThreadPool{5};

// 提交一些任务
std::vector<std::future<int>> results;
results.reserve(10);

for (int i = 0; i < 10; ++i) {
std::string msg = "current available threads: "
+ std::to_string(pool.get_idle_thread_num())
+ '\n';
std::cout << msg;

// 提交任务
if (i % 3 == 0) {
auto func = [i]() -> int {
std::string msg2 =
"Task " + std::to_string(i) + " executed\n";
std::cout << msg2;
std::this_thread::sleep_for(std::chrono::seconds(3 + i));

throw std::runtime_error("Exception in task "
+ std::to_string(i));
};
results.emplace_back(pool.commit(func));
}
else {
auto func = [i](int j) -> int {
std::string msg2 =
"Task " + std::to_string(i) + " executed\n";
std::cout << msg2;
std::this_thread::sleep_for(std::chrono::seconds(3 + i));
return i * j;
};
results.emplace_back(pool.commit(func, i + 1));
}
}

// 获取任务结果
for (size_t i = 0; i < results.size(); ++i) {
try {
std::string msg = "Result of task " + std::to_string(i) + ": "
+ std::to_string(results[i].get()) + '\n';
std::cout << msg;
}
catch (const std::exception &e) {
std::cerr << "Exception caught: " << e.what() << '\n';
}
catch (...) {
std::cerr << "Unknown exception caught\n";
}
}
}
catch (const std::exception &e) {
std::cerr << "Exception caught: " << e.what() << '\n';
}

return 0;
}