Cpp 多线程学习笔记——3. future 异步编程
概述
C++通过<future>头文件提供了一组支持异步编程的工具,使用这些工具比直接进行多线程操作更加高级、更加简便。
主要包括如下的类型:
std::future:表示异步操作的结果,这个结果在未来可能可用,支持查询操作的状态,等待操作完成和获取结果。注意用于获取结果的get()方法调用会阻塞当前执行流,直到结果准备就绪。std::promise:承诺在未来提供一个可用的值,通常与std::future配对使用,set_result()可以设置异步操作的结果。可用get_future()提取获得一个关联的std::future对象。std::packaged_task:封装一个函数或可调用对象,使其可以作为异步任务执行。可用get_future()获得一个关联的std::future对象。
还包括如下的函数:
std::async:用于启动异步任务,返回一个std::future对象代表任务的结果,注意我们必须要用变量接收这个返回值,否则当前语句会阻塞式的等待任务结束,因为只有异步任务结束才会销毁返回的临时变量!
这里
std::future和std::promise是成对使用的,std::packaged_task类型和std::async函数则是对异步编程的进一步封装和简化,可以避免显式处理std::promise对象。
实例
我们直接用几个例子来解释异步编程的基本用法,从最简单的例子开始
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int main() {
std::promise<double> prom;
std::future<double> fut = prom.get_future();
// 在另一个线程中设置结果
std::thread t([&prom](double x) { prom.set_value(sqrt(x)); }, 2.0);
// 等待并展示结果
std::cout << "Result: " << fut.get() << '\n';
t.join();
return 0;
}
代码解释如下:
std::promise代表一个承诺:- 可以以引用传递方式传递到子线程中,并通过
set_value()方法设置值 - 可以使用
get_future()方法获取std::future对象
- 可以以引用传递方式传递到子线程中,并通过
std::future代表一个异步编程的结果:可以通过get()方法获取结果
第二个例子是std::packaged_task的使用 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int main() {
std::packaged_task<double(double)> task([](double x) { return sqrt(x); });
std::future<double> result = task.get_future();
// 将task移动到另一个线程中执行(加上需要传入的参数)
std::thread th(std::move(task), 2.0);
// 等待并展示结果
std::cout << "Result: " << result.get() << '\n';
th.join();
return 0;
}
std::packaged_task的使用比第一个例子更加简单,省略了std::promise的定义和使用,
自动包装一个可调用对象,并把执行结果传递给std::future对象。
第三个例子是std::async函数的使用,仍然需要提供一个可调用对象(以及需要传入的参数)
1
2
3
4
5
6
7
8
9
10
11
12
13
int main() {
std::future<double> result =
std::async(std::launch::async, [](double x) { return sqrt(x); }, 2.0);
// 等待并展示结果
std::cout << "Result: " << result.get() << '\n';
return 0;
}
std::async的使用更加高级和简洁,完全不需要手动创建和管理线程,返回的std::future对象可以获取可调用对象的结果。
std::future
std::future对象通常不会直接创建,而是通过如下几种方式获得:
std::promise对象的get_future()方法std::packaged_task对象的get_future()方法std::async函数的返回值
这些方式获得的对象自动与对应的异步操作相关联。
std::future对象支持如下方法:
get():获取对应异步操作的结果。如果结果尚未准备好,此调用将阻塞,直到结果可用。(暂不讨论异步操作中的异常问题)wait():阻塞当前线程,进入无限等待状态,直到对应的异步操作完成,无返回值。wait_for:等待指定的时间段,在这段时间内异步操作完成或超时都将结束等待。wait_until:等待直到指定的时间点,在时间点之间异步操作完成或超时都将结束等待。valid():检查std::future对象是否有效,即是否关联了一个异步操作,返回布尔值。
其中get()和wait()都会阻塞当前线程直到任务完成,但是wait()可以多次调用,而get()只允许调用一次。
可以使用wait_for(0)实现非阻塞式的检查 1
2
3
4
5
6if(fut.wait_for(std::chrono::seconds(0)) == std::future_status::ready) {
// 操作已完成
}
else {
// 操作尚未完成
}
std::promise
std::promise通常和std::future成对出现:(使用相同的类型模板参数)
std::promise用于在某一线程中通过set_future()设置某个值std::future则用于在另一线程中通过get()获取这个值。
通常先创建std::promise对象,然后使用get_future()创建与之关联的std::future对象。
由于std::promise对象不支持,我们必须通过移动或者引用传递的方式提供给子线程。
对于更复杂的情况,则需要使用共享的std::shared_future类型,它相比于std::future有更弱的所有权,
允许多个线程都通过get()获取结果。(std::future只能调用一次get())
使用std::shared_future的示例如下 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int main() {
std::promise<int> prom;
std::thread t([&]() {
std::this_thread::sleep_for(std::chrono::seconds(2));
prom.set_value(42);
});
t.detach();
std::shared_future<int> sharedFuture = prom.get_future().share();
std::cout << "Starting tasks...\n";
// 在两个不同的任务中使用shared_future
std::thread task1([&]() { std::cout << sharedFuture.get() << '\n'; });
std::thread task2([&]() { std::cout << sharedFuture.get() << '\n'; });
task1.join();
task2.join();
std::cout << "Tasks completed.\n";
return 0;
}
虽然多个线程都可以用get()获取结果,但是显然在结果尚未就绪时,对应的线程仍然需要陷入阻塞式的等待中。
std::packaged_task
std::packaged_task只是对可调用对象的一次封装,省略了std::promise的角色,
并且显然和std::promise一样不支持拷贝,只能使用移动的方式传递给子线程,其它没什么好说的。
不同编译器对于
std::packaged_task的实现还不一样,例如gcc允许对其进行移动,但是MSVC似乎不允许。
std::async
std::async的调用方式有两种:
- 第一种方式需要依次传入启动策略、可调用对象、可调用对象需要的参数;
- 第一种方式只需要传入可调用对象、可调用对象需要的参数,使用默认的启动策略。
两种用法示例如下
1 | std::future<double> result1 = |
std::async接受的启动策略通过std::launch枚举类提供:
std::launch::async:表示任务将立刻在另一个新线程中异步执行std::launch::deferred:表示任务会被延迟执行,直到需要提供结果时才会在当前线程中同步执行,例如用户调用std::future::get()或std::future::wait()函数时。std::launch::async | std::launch::deferred:这是上面两个策略的组合,任务既可以在一个单独的线程上异步执行,也可以选择延迟执行,取决于具体实现,不同的编译器和操作系统可能会有不同的默认行为。
在启动策略缺省时,std::async会使用std::launch::async | std::launch::deferred策略。
需要强调的是,我们必须使用std::future对象来接收std::async函数的返回值,否则产生的临时对象会直到异步操作完成才会析构,这会对主线程产生阻塞。
C++ 实现简易线程池
下面提供一个线程池的简易实现(参考 C++
并发三剑客future, promise和async 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
class SimpleThreadPool {
public:
// 构造时自动开启线程池
explicit SimpleThreadPool(uint32_t thread_num)
: m_thread_num(thread_num > 0 ? thread_num : 1) {
start();
}
// 析构时自动关闭线程池
~SimpleThreadPool() { stop(); }
// 禁止复制
SimpleThreadPool(const SimpleThreadPool &) = delete;
SimpleThreadPool &operator=(const SimpleThreadPool &) = delete;
// 提交任务,含参数,返回future
template <class F, class... Args>
auto commit(F &&f, Args &&...args) {
// 返回类型
using RetType = std::invoke_result_t<F, Args...>;
// 如果线程池已经停止,直接返回空的future
if (!m_running.load())
throw std::runtime_error("ThreadPool is stopped.");
// 提交任务
// 对参数进行完美转发,使用lambda表达式打包可调用对象和参数
// 创建packaged_task对象进一步包装
auto new_task_ptr = std::make_shared<std::packaged_task<RetType()>>(
[func = std::forward<F>(f),
... args = std::forward<Args>(args)]() mutable {
return func(std::forward<Args>(args)...);
});
std::future<RetType> result = new_task_ptr->get_future();
{
std::lock_guard<std::mutex> mtx_guard(m_mtx);
m_tasks.emplace([new_task_ptr] { (*new_task_ptr)(); });
}
m_cv.notify_one(); // 唤醒一个线程来执行任务
return result; // 返回future对象
}
// 获取当前可用的线程数量
uint32_t get_idle_thread_num() const { return m_idle_thread_num; }
// 获取线程池实例的线程数量
uint32_t get_thread_num() const { return m_thread_num; }
private:
// 开启线程池
void start() {
std::unique_lock<std::mutex> mtx_guard(m_mtx);
m_running.store(true);
m_idle_thread_num.store(m_thread_num);
for (uint32_t i = 0; i < m_thread_num; ++i) {
// 向线程池中填充默认任务
m_pool.emplace_back([this]() { // 必须显式捕获this指针
while (this->m_running.load()) { // 线程池开启时无法跳出循环
std::packaged_task<void()> task;
{
// 获取互斥锁
std::unique_lock<std::mutex> mtx_guard2(m_mtx);
// 通过条件变量让线程陷入等待
// 只有当前的任务队列非空或线程池已经被关闭时才会被成功唤醒
this->m_cv.wait(mtx_guard2, [this] {
return !this->m_running.load()
|| !this->m_tasks.empty();
});
// 如果任务队列为空,代表没有有效任务,直接return
// 此时通常意味着线程池被关闭,可以跳出while循环
if (this->m_tasks.empty()) { return; }
// 以move方式获取队列中的首位任务并执行
task = std::move(this->m_tasks.front());
this->m_tasks.pop();
}
this->m_idle_thread_num--; // 可用线程数-1
task();
this->m_idle_thread_num++; // 可用线程数+1
}
});
}
}
// 关闭线程池
void stop() {
m_running.store(false); // 设置为停止状态
m_cv.notify_all(); // 唤醒所有线程
// 合并所有线程
for (auto &td : m_pool) {
if (td.joinable()) { td.join(); }
}
}
std::mutex m_mtx; // 互斥锁
std::condition_variable m_cv; // 条件变量
std::atomic_bool m_running; // 线程池是否正在运行
std::atomic_uint32_t m_thread_num; // 线程池大小
std::atomic_uint32_t m_idle_thread_num; // 可用的空闲线程数
std::queue<std::packaged_task<void()>> m_tasks; // 任务队列
std::vector<std::thread> m_pool; // 线程池
};
测试代码如下 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
int main() {
try {
// 获取线程池实例
auto pool = SimpleThreadPool{5};
// 提交一些任务
std::vector<std::future<int>> results;
results.reserve(10);
for (int i = 0; i < 10; ++i) {
std::string msg = "current available threads: "
+ std::to_string(pool.get_idle_thread_num())
+ '\n';
std::cout << msg;
// 提交任务
if (i % 3 == 0) {
auto func = [i]() -> int {
std::string msg2 =
"Task " + std::to_string(i) + " executed\n";
std::cout << msg2;
std::this_thread::sleep_for(std::chrono::seconds(3 + i));
throw std::runtime_error("Exception in task "
+ std::to_string(i));
};
results.emplace_back(pool.commit(func));
}
else {
auto func = [i](int j) -> int {
std::string msg2 =
"Task " + std::to_string(i) + " executed\n";
std::cout << msg2;
std::this_thread::sleep_for(std::chrono::seconds(3 + i));
return i * j;
};
results.emplace_back(pool.commit(func, i + 1));
}
}
// 获取任务结果
for (size_t i = 0; i < results.size(); ++i) {
try {
std::string msg = "Result of task " + std::to_string(i) + ": "
+ std::to_string(results[i].get()) + '\n';
std::cout << msg;
}
catch (const std::exception &e) {
std::cerr << "Exception caught: " << e.what() << '\n';
}
catch (...) {
std::cerr << "Unknown exception caught\n";
}
}
}
catch (const std::exception &e) {
std::cerr << "Exception caught: " << e.what() << '\n';
}
return 0;
}
