C++11引入了强大的多线程支持,主要通过<thread>
头文件提供。以下是详细介绍:
使用函数创建线程
xxxxxxxxxx
121
2
3
4void hello() {
5 std::cout << "Hello from thread!" << std::endl;
6}
7
8int main() {
9 std::thread t(hello); // 创建线程
10 t.join(); // 等待线程完成
11 return 0;
12}
使用Lambda表达式
xxxxxxxxxx
101
2
3
4int main() {
5 std::thread t([]() {
6 std::cout << "Lambda thread!" << std::endl;
7 });
8 t.join();
9 return 0;
10}
使用类成员函数
xxxxxxxxxx
161
2
3
4class MyClass {
5public:
6 void member_function() {
7 std::cout << "Member function thread!" << std::endl;
8 }
9};
10
11int main() {
12 MyClass obj;
13 std::thread t(&MyClass::member_function, &obj);
14 t.join();
15 return 0;
16}
join() 和 detach()
xxxxxxxxxx
191
2
3
4
5void worker() {
6 std::this_thread::sleep_for(std::chrono::seconds(2));
7 std::cout << "Worker finished" << std::endl;
8}
9
10int main() {
11 std::thread t(worker);
12
13 // 选择一种方式:
14 t.join(); // 等待线程完成
15 // 或者
16 // t.detach(); // 分离线程,让它独立运行
17
18 return 0;
19}
检查线程状态
xxxxxxxxxx
151
2
3
4int main() {
5 std::thread t([]() {
6 std::cout << "Thread running" << std::endl;
7 });
8
9 if (t.joinable()) {
10 std::cout << "Thread is joinable" << std::endl;
11 t.join();
12 }
13
14 return 0;
15}
按值传递
xxxxxxxxxx
161
2
3
4void print_value(int x, std::string str) {
5 std::cout << "Value: " << x << ", String: " << str << std::endl;
6}
7
8int main() {
9 int num = 42;
10 std::string text = "Hello";
11
12 std::thread t(print_value, num, text);
13 t.join();
14
15 return 0;
16}
按引用传递
xxxxxxxxxx
181
2
3
4void modify_value(int& x) {
5 x = 100;
6}
7
8int main() {
9 int num = 42;
10
11 // 使用std::ref传递引用
12 std::thread t(modify_value, std::ref(num));
13 t.join();
14
15 std::cout << "Modified value: " << num << std::endl; // 输出100
16
17 return 0;
18}
std::mutex
是最基本的互斥锁,同一时间只允许一个线程访问。
xxxxxxxxxx
251
2
3
4
5std::mutex mtx;
6int counter = 0;
7
8void increment() {
9 for (int i = 0; i < 1000; ++i) {
10 mtx.lock(); // 加锁
11 ++counter;
12 mtx.unlock(); // 解锁
13 }
14}
15
16int main() {
17 std::thread t1(increment);
18 std::thread t2(increment);
19
20 t1.join();
21 t2.join();
22
23 std::cout << "Counter: " << counter << std::endl;
24 return 0;
25}
std::recursive_mutex
允许同一线程多次获取同一个锁。
xxxxxxxxxx
231
2
3
4
5std::recursive_mutex rec_mtx;
6
7void recursive_function(int depth) {
8 rec_mtx.lock();
9
10 std::cout << "Depth: " << depth << std::endl;
11
12 if (depth > 0) {
13 recursive_function(depth - 1); // 同一线程再次获取锁
14 }
15
16 rec_mtx.unlock();
17}
18
19int main() {
20 std::thread t(recursive_function, 3);
21 t.join();
22 return 0;
23}
std::timed_mutex
是支持超时的互斥锁。
xxxxxxxxxx
271
2
3
4
5
6std::timed_mutex timed_mtx;
7
8void worker(int id) {
9 // 尝试在2秒内获取锁
10 if (timed_mtx.try_lock_for(std::chrono::seconds(2))) {
11 std::cout << "Thread " << id << " got the lock" << std::endl;
12 std::this_thread::sleep_for(std::chrono::seconds(3));
13 timed_mtx.unlock();
14 } else {
15 std::cout << "Thread " << id << " timeout" << std::endl;
16 }
17}
18
19int main() {
20 std::thread t1(worker, 1);
21 std::thread t2(worker, 2);
22
23 t1.join();
24 t2.join();
25
26 return 0;
27}
std::recursive_timed_mutex
是结合了递归和定时功能的互斥锁。
xxxxxxxxxx
131std::recursive_timed_mutex rec_timed_mtx;
2
3void timed_recursive_function(int depth) {
4 if (rec_timed_mtx.try_lock_for(std::chrono::milliseconds(100))) {
5 std::cout << "Depth: " << depth << std::endl;
6
7 if (depth > 0) {
8 timed_recursive_function(depth - 1);
9 }
10
11 rec_timed_mtx.unlock();
12 }
13}
最简单的RAII锁管理器,构造时加锁,析构时自动解锁。
xxxxxxxxxx
361
2
3
4
5std::mutex mtx;
6int shared_data = 0;
7
8void safe_increment() {
9 std::lock_guard<std::mutex> lock(mtx); // 自动加锁
10 ++shared_data;
11 // 函数结束时自动解锁
12}
13
14void risky_function() {
15 std::lock_guard<std::mutex> lock(mtx);
16 ++shared_data;
17
18 if (shared_data > 5) {
19 throw std::runtime_error("Error!"); // 异常时也会自动解锁
20 }
21}
22
23int main() {
24 std::vector<std::thread> threads;
25
26 for (int i = 0; i < 10; ++i) {
27 threads.emplace_back(safe_increment);
28 }
29
30 for (auto& t : threads) {
31 t.join();
32 }
33
34 std::cout << "Shared data: " << shared_data << std::endl;
35 return 0;
36}
更灵活的锁管理器,支持延迟加锁、条件变量等。
xxxxxxxxxx
471
2
3
4
5
6std::mutex mtx;
7std::condition_variable cv;
8bool ready = false;
9
10void worker() {
11 std::unique_lock<std::mutex> lock(mtx);
12
13 // 等待条件满足
14 cv.wait(lock, []{ return ready; });
15
16 std::cout << "Worker thread proceeding..." << std::endl;
17}
18
19void setter() {
20 std::this_thread::sleep_for(std::chrono::seconds(1));
21
22 {
23 std::unique_lock<std::mutex> lock(mtx);
24 ready = true;
25 }
26
27 cv.notify_one();
28}
29
30// 手动控制锁的示例
31void flexible_locking() {
32 std::unique_lock<std::mutex> lock(mtx, std::defer_lock); // 延迟加锁
33
34 // 做一些不需要锁的工作
35 std::this_thread::sleep_for(std::chrono::milliseconds(100));
36
37 lock.lock(); // 手动加锁
38 // 临界区代码
39 lock.unlock(); // 手动解锁
40
41 // 做更多不需要锁的工作
42 std::this_thread::sleep_for(std::chrono::milliseconds(100));
43
44 lock.lock(); // 再次加锁
45 // 更多临界区代码
46 // 析构时自动解锁
47}
用于读写锁的共享锁定。
xxxxxxxxxx
201
2
3// C++14
4
5
6std::shared_mutex rw_mutex;
7int shared_data = 0;
8
9void reader(int id) {
10 std::shared_lock<std::shared_mutex> lock(rw_mutex); // 共享锁
11 std::cout << "Reader " << id << " reads: " << shared_data << std::endl;
12 std::this_thread::sleep_for(std::chrono::milliseconds(100));
13}
14
15void writer(int id) {
16 std::unique_lock<std::shared_mutex> lock(rw_mutex); // 独占锁
17 ++shared_data;
18 std::cout << "Writer " << id << " writes: " << shared_data << std::endl;
19 std::this_thread::sleep_for(std::chrono::milliseconds(100));
20}
同时锁定多个互斥锁,避免死锁。
xxxxxxxxxx
331
2
3
4
5std::mutex mtx1, mtx2;
6
7void worker1() {
8 std::lock(mtx1, mtx2); // 同时锁定两个互斥锁
9 std::lock_guard<std::mutex> lock1(mtx1, std::adopt_lock);
10 std::lock_guard<std::mutex> lock2(mtx2, std::adopt_lock);
11
12 std::cout << "Worker 1 acquired both locks" << std::endl;
13 std::this_thread::sleep_for(std::chrono::milliseconds(100));
14}
15
16void worker2() {
17 std::lock(mtx2, mtx1); // 不同的顺序,但不会死锁
18 std::lock_guard<std::mutex> lock1(mtx2, std::adopt_lock);
19 std::lock_guard<std::mutex> lock2(mtx1, std::adopt_lock);
20
21 std::cout << "Worker 2 acquired both locks" << std::endl;
22 std::this_thread::sleep_for(std::chrono::milliseconds(100));
23}
24
25int main() {
26 std::thread t1(worker1);
27 std::thread t2(worker2);
28
29 t1.join();
30 t2.join();
31
32 return 0;
33}
xxxxxxxxxx
271// 策略1:固定顺序获取锁
2void fixed_order_locking() {
3 // 总是按照地址顺序获取锁
4 std::mutex* first = &mtx1 < &mtx2 ? &mtx1 : &mtx2;
5 std::mutex* second = &mtx1 < &mtx2 ? &mtx2 : &mtx1;
6
7 std::lock_guard<std::mutex> lock1(*first);
8 std::lock_guard<std::mutex> lock2(*second);
9}
10
11// 策略2:使用std::lock
12void simultaneous_locking() {
13 std::lock(mtx1, mtx2);
14 std::lock_guard<std::mutex> lock1(mtx1, std::adopt_lock);
15 std::lock_guard<std::mutex> lock2(mtx2, std::adopt_lock);
16}
17
18// 策略3:使用超时
19void timeout_locking() {
20 std::unique_lock<std::timed_mutex> lock1(timed_mtx);
21
22 if (auto lock2 = std::unique_lock<std::timed_mutex>(timed_mtx, std::try_to_lock)) {
23 // 成功获取两个锁
24 } else {
25 // 处理获取锁失败的情况
26 }
27}
非阻塞的锁获取尝试。
xxxxxxxxxx
321
2
3
4
5std::mutex mtx;
6
7void try_lock_example() {
8 if (mtx.try_lock()) {
9 std::cout << "Got the lock!" << std::endl;
10 std::this_thread::sleep_for(std::chrono::seconds(1));
11 mtx.unlock();
12 } else {
13 std::cout << "Failed to get the lock" << std::endl;
14 }
15}
16
17void try_lock_multiple() {
18 std::mutex mtx1, mtx2, mtx3;
19
20 // 尝试同时获取多个锁
21 int result = std::try_lock(mtx1, mtx2, mtx3);
22
23 if (result == -1) {
24 // 成功获取所有锁
25 std::cout << "Got all locks" << std::endl;
26 mtx1.unlock();
27 mtx2.unlock();
28 mtx3.unlock();
29 } else {
30 std::cout << "Failed at lock " << result << std::endl;
31 }
32}
延迟加锁,构造时不立即获取锁。
xxxxxxxxxx
111std::mutex mtx;
2
3void deferred_locking() {
4 std::unique_lock<std::mutex> lock(mtx, std::defer_lock);
5
6 // 做一些不需要锁的工作
7 std::this_thread::sleep_for(std::chrono::milliseconds(100));
8
9 lock.lock(); // 现在才获取锁
10 // 临界区代码
11}
尝试获取锁,不阻塞。
xxxxxxxxxx
101void try_to_lock_example() {
2 std::unique_lock<std::mutex> lock(mtx, std::try_to_lock);
3
4 if (lock.owns_lock()) {
5 std::cout << "Successfully acquired lock" << std::endl;
6 // 临界区代码
7 } else {
8 std::cout << "Failed to acquire lock" << std::endl;
9 }
10}
接管已经获取的锁。
xxxxxxxxxx
81void adopt_lock_example() {
2 mtx.lock(); // 手动获取锁
3
4 std::lock_guard<std::mutex> lock(mtx, std::adopt_lock); // 接管锁
5
6 // 临界区代码
7 // lock析构时会自动释放锁
8}
锁类型 | 性能 | 特性 | 使用场景 |
---|---|---|---|
mutex | 高 | 基础互斥 | 一般同步 |
recursive_mutex | 中 | 递归锁定 | 递归函数 |
timed_mutex | 中 | 超时机制 | 需要超时控制 |
shared_mutex | 中 | 读写分离 | 读多写少 |
C++中使用条件变量可以使用std::condition_variable
。
xxxxxxxxxx
501
2
3
4
5
6
7std::mutex mtx;
8std::condition_variable cv;
9std::queue<int> data_queue;
10bool finished = false;
11
12void producer() {
13 for (int i = 0; i < 10; ++i) {
14 std::unique_lock<std::mutex> lock(mtx);
15 data_queue.push(i);
16 std::cout << "Produced: " << i << std::endl;
17 cv.notify_one(); // 通知等待的线程
18 }
19
20 std::unique_lock<std::mutex> lock(mtx);
21 finished = true;
22 cv.notify_all();
23}
24
25void consumer() {
26 while (true) {
27 std::unique_lock<std::mutex> lock(mtx);
28
29 // 等待条件满足
30 cv.wait(lock, []() { return !data_queue.empty() || finished; });
31
32 while (!data_queue.empty()) {
33 int value = data_queue.front();
34 data_queue.pop();
35 std::cout << "Consumed: " << value << std::endl;
36 }
37
38 if (finished) break;
39 }
40}
41
42int main() {
43 std::thread prod(producer);
44 std::thread cons(consumer);
45
46 prod.join();
47 cons.join();
48
49 return 0;
50}
实现原理:
CAS机制:会将变量的预期值A与在内存中的值V进行比较,如果V与A是相等,就可以将该值改为B;如果V与A的值是不相等,那么就不能将其改为新值B。
xxxxxxxxxx
231
2
3
4
5std::atomic<int> atomic_counter(0);
6
7void atomic_increment() {
8 for (int i = 0; i < 1000; ++i) {
9 atomic_counter++; // 原子操作,无需显式锁
10 }
11}
12
13int main() {
14 std::thread t1(atomic_increment);
15 std::thread t2(atomic_increment);
16
17 t1.join();
18 t2.join();
19
20 std::cout << "Atomic counter: " << atomic_counter << std::endl;
21
22 return 0;
23}
xxxxxxxxxx
871
2
3
4
5
6
7
8using std::condition_variable;
9using std::cout;
10using std::endl;
11using std::mutex;
12using std::queue;
13using std::thread;
14using std::unique_lock;
15
16class TaskQueue {
17public:
18 TaskQueue(size_t capacity) : _capacity(capacity) {}
19 bool isFull() {
20 return _queue.size() >= _capacity;
21 }
22 bool isEmpty() {
23 return _queue.empty();
24 }
25 void push(const int &value) {
26 unique_lock<mutex> ul{_mutex};
27 while (isFull()) {
28 _notFull.wait(ul);
29 }
30 _queue.push(value);
31 cout << "生产" << value << endl
32 << "--------" << endl;
33 _notEmpty.notify_one();
34 }
35 int pop() {
36 unique_lock<mutex> ul{_mutex};
37 while (isEmpty()) {
38 _notEmpty.wait(ul);
39 }
40 int temp = _queue.front();
41 _queue.pop();
42 cout << "消费" << temp << endl
43 << "--------" << endl;
44 _notFull.notify_one();
45 return temp;
46 }
47
48private:
49 size_t _capacity;
50 queue<int> _queue;
51 mutex _mutex;
52 condition_variable _notEmpty;
53 condition_variable _notFull;
54};
55
56class Producer {
57public:
58 void produce(TaskQueue &taskQueue) {
59 srand(clock());
60 int i = 10;
61 while (i--) {
62 taskQueue.push(rand() % 100);
63 }
64 }
65};
66
67class Consumer {
68public:
69 void consume(TaskQueue &taskQueue) {
70 int i = 10;
71 while (i--) {
72 taskQueue.pop();
73 }
74 }
75};
76
77int main(int argc, char *argv[]) {
78 TaskQueue taskQueue{5};
79 Producer producer;
80 Consumer consumer;
81 thread producerThread{&Producer::produce, &producer, std::ref(taskQueue)};
82 thread consumerThread{&Consumer::consume, &consumer, std::ref(taskQueue)};
83
84 producerThread.join();
85 consumerThread.join();
86 return 0;
87}
TaskQueue(任务队列)
私有成员:
_capacity
: 队列容量
_queue
: 存储整数的队列
_mutex
: 互斥锁
_notEmpty
, _notFull
: 条件变量
公有方法:
构造函数、判空判满、入队出队操作
Producer(生产者)
包含 produce()
方法,向队列生产数据
Consumer(消费者)
包含 consume()
方法,从队列消费数据
xxxxxxxxxx
1471
2
3
4
5
6
7
8
9
10using std::condition_variable;
11using std::cout;
12using std::endl;
13using std::mutex;
14using std::queue;
15using std::thread;
16using std::unique_lock;
17using std::unique_ptr;
18using std::vector;
19
20class Task {
21public:
22 virtual void process() = 0;
23 virtual ~Task() {}
24};
25
26class TaskQueue {
27public:
28 TaskQueue(size_t capacity) : _capacity(capacity) {}
29 bool isFull() {
30 return _queue.size() >= _capacity;
31 }
32 bool isEmpty() {
33 return _queue.empty();
34 }
35 void push(Task *task) {
36 unique_lock<mutex> ul{_mutex};
37 while (isFull()) {
38 _notFull.wait(ul);
39 }
40 _queue.push(task);
41 _notEmpty.notify_one();
42 }
43 Task *pop() {
44 unique_lock<mutex> ul{_mutex};
45 while (isEmpty() && _flag) {
46 _notEmpty.wait(ul);
47 }
48 if (_flag) {
49 Task *temp = _queue.front();
50 _queue.pop();
51 _notFull.notify_one();
52 return temp;
53 }
54 return nullptr;
55 }
56 void wakeAll() {
57 _flag = false;
58 _notEmpty.notify_all();
59 }
60
61private:
62 size_t _capacity;
63 queue<Task *> _queue;
64 mutex _mutex;
65 condition_variable _notEmpty;
66 condition_variable _notFull;
67 bool _flag = true;
68};
69
70class ThreadPool {
71public:
72 ThreadPool(size_t threadNum, size_t queueSize)
73 : _threadNum(threadNum),
74 _taskQueue(queueSize),
75 _queueSize(queueSize) {}
76 void start() {
77 for (int i = 0; i < _threadNum; ++i) {
78 _threads.push_back(thread{&ThreadPool::doTask, this});
79 }
80 }
81 void stop() {
82 while (!_taskQueue.isEmpty()) {
83 std::this_thread::sleep_for(std::chrono::seconds(1));
84 }
85 _isAlive = false;
86 _taskQueue.wakeAll();
87 for (auto &th : _threads) {
88 th.join();
89 }
90 }
91 void addTask(Task *task) {
92 if (task) {
93 _taskQueue.push(task);
94 }
95 }
96 Task *getTask() {
97 return _taskQueue.pop();
98 }
99 void doTask() {
100 while (_isAlive) {
101 Task *task = getTask();
102 if (task && _isAlive) {
103 task->process();
104 }
105 }
106 }
107
108private:
109 TaskQueue _taskQueue;
110 size_t _threadNum;
111 vector<thread> _threads;
112 size_t _queueSize;
113 bool _isAlive = true;
114};
115
116int taskNum = 1;
117mutex mtx;
118
119class Task1 : public Task {
120public:
121 void process() override {
122 unique_lock<mutex> ul{mtx};
123 cout << "任务" << taskNum++ << endl;
124 }
125};
126
127class Task2 : public Task {
128public:
129 void process() override {
130 // 任务2
131 }
132};
133
134int main(int argc, char *argv[]) {
135 ThreadPool pool{3, 10};
136 pool.start();
137 vector<unique_ptr<Task>> tasks;
138 int i = 20;
139 while (i--) {
140 unique_ptr<Task> task{new Task1};
141 pool.addTask(task.get());
142 tasks.push_back(std::move(task));
143 cout << i << endl;
144 }
145 pool.stop();
146 return 0;
147}
1. Task(抽象基类)
类型 : 抽象基类
方法 :
process()
: 纯虚函数,定义任务处理接口
~Task()
: 虚析构函数,确保正确的多态析构
2. TaskQueue(任务队列)
私有成员 :
_capacity
: 队列容量限制
_queue
: 存储Task指针的队列
_mutex
: 互斥锁,保护共享资源
_notEmpty
: 条件变量,通知队列非空
_notFull
: 条件变量,通知队列未满
_flag
: 布尔标志,控制队列状态
公有方法 :
TaskQueue(size_t capacity)
: 构造函数
isFull()
: 检查队列是否已满
isEmpty()
: 检查队列是否为空
push(Task *task)
: 添加任务(阻塞式)
pop()
: 获取任务(阻塞式)
wakeAll()
: 唤醒所有等待线程
3. ThreadPool(线程池)
私有成员 :
_taskQueue
: 任务队列实例
_threadNum
: 线程数量
_threads
: 工作线程容器
_queueSize
: 队列大小
_isAlive
: 线程池状态标志
公有方法 :
ThreadPool(size_t threadNum, size_t queueSize)
: 构造函数
start()
: 启动线程池
stop()
: 停止线程池
addTask(Task *task)
: 添加任务
getTask()
: 获取任务
doTask()
: 工作线程执行函数
4. Task1和Task2(具体任务实现)
Task1
: 实现了 process()
方法,输出任务编号
Task2
: 实现了 process()
方法,当前为空实现
5. 关系说明
继承关系 : Task1
和Task2
继承自Task
抽象基类
组合关系 : ThreadPool
包含TaskQueue
实例
依赖关系 :
TaskQueue
管理Task
指针
ThreadPool
使用Task
进行任务处理
Task1
使用全局变量taskNum
和mtx
xxxxxxxxxx
2131
2
3
4
5
6
7
8
9
10
11using std::condition_variable;
12using std::cout;
13using std::endl;
14using std::mutex;
15using std::queue;
16using std::thread;
17using std::unique_lock;
18using std::vector;
19using std::function;
20using std::bind;
21
22// 任务类型定义为函数对象
23using Task = std::function<void()>;
24
25class TaskQueue {
26public:
27 TaskQueue(size_t capacity) : _capacity(capacity) {}
28
29 bool isFull() {
30 return _queue.size() >= _capacity;
31 }
32
33 bool isEmpty() {
34 return _queue.empty();
35 }
36
37 void push(const Task& task) {
38 unique_lock<mutex> ul{_mutex};
39 while (isFull()) {
40 _notFull.wait(ul);
41 }
42 _queue.push(task);
43 _notEmpty.notify_one();
44 }
45
46 Task pop() {
47 unique_lock<mutex> ul{_mutex};
48 while (isEmpty() && _flag) {
49 _notEmpty.wait(ul);
50 }
51 if (_flag && !_queue.empty()) {
52 Task task = _queue.front();
53 _queue.pop();
54 _notFull.notify_one();
55 return task;
56 }
57 return nullptr; // 返回空函数对象
58 }
59
60 void wakeAll() {
61 _flag = false;
62 _notEmpty.notify_all();
63 }
64
65private:
66 size_t _capacity;
67 queue<Task> _queue;
68 mutex _mutex;
69 condition_variable _notEmpty;
70 condition_variable _notFull;
71 bool _flag = true;
72};
73
74class ThreadPool {
75public:
76 ThreadPool(size_t threadNum, size_t queueSize)
77 : _threadNum(threadNum),
78 _taskQueue(queueSize),
79 _queueSize(queueSize) {}
80
81 void start() {
82 for (size_t i = 0; i < _threadNum; ++i) {
83 _threads.emplace_back(&ThreadPool::doTask, this);
84 }
85 }
86
87 void stop() {
88 while (!_taskQueue.isEmpty()) {
89 std::this_thread::sleep_for(std::chrono::milliseconds(100));
90 }
91 _isAlive = false;
92 _taskQueue.wakeAll();
93 for (auto& th : _threads) {
94 if (th.joinable()) {
95 th.join();
96 }
97 }
98 }
99
100 void addTask(const Task& task) {
101 if (task) {
102 _taskQueue.push(task);
103 }
104 }
105
106 // 模板方法,支持任意可调用对象
107 template<typename F, typename... Args>
108 void submit(F&& f, Args&&... args) {
109 auto task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
110 addTask(task);
111 }
112
113private:
114 Task getTask() {
115 return _taskQueue.pop();
116 }
117
118 void doTask() {
119 while (_isAlive) {
120 Task task = getTask();
121 if (task && _isAlive) {
122 task(); // 直接调用函数对象
123 }
124 }
125 }
126
127 TaskQueue _taskQueue;
128 size_t _threadNum;
129 vector<thread> _threads;
130 size_t _queueSize;
131 bool _isAlive = true;
132};
133
134// 全局变量
135int taskNum = 1;
136mutex mtx;
137
138// 普通函数任务
139void task1() {
140 unique_lock<mutex> ul{mtx};
141 cout << "任务" << taskNum++ << endl;
142}
143
144void task2() {
145 unique_lock<mutex> ul{mtx};
146 cout << "任务2执行" << endl;
147}
148
149// 带参数的任务函数
150void taskWithParam(int id, const std::string& message) {
151 unique_lock<mutex> ul{mtx};
152 cout << "任务" << id << ": " << message << endl;
153}
154
155// 任务类
156class TaskProcessor {
157public:
158 void processTask(int id) {
159 unique_lock<mutex> ul{mtx};
160 cout << "TaskProcessor处理任务" << id << endl;
161 }
162
163 void processTaskWithMessage(int id, const std::string& msg) {
164 unique_lock<mutex> ul{mtx};
165 cout << "TaskProcessor任务" << id << ": " << msg << endl;
166 }
167};
168
169int main(int argc, char* argv[]) {
170 ThreadPool pool{3, 10};
171 pool.start();
172
173 TaskProcessor processor;
174
175 // 1. 使用普通函数
176 for (int i = 0; i < 5; ++i) {
177 pool.addTask(task1);
178 }
179
180 // 2. 使用lambda表达式
181 for (int i = 0; i < 3; ++i) {
182 pool.addTask([i]() {
183 unique_lock<mutex> ul{mtx};
184 cout << "Lambda任务" << i << endl;
185 });
186 }
187
188 // 3. 使用std::bind绑定带参数的函数
189 for (int i = 0; i < 3; ++i) {
190 pool.addTask(std::bind(taskWithParam, i, "绑定参数"));
191 }
192
193 // 4. 使用submit模板方法
194 for (int i = 0; i < 3; ++i) {
195 pool.submit(taskWithParam, i + 100, "模板提交");
196 }
197
198 // 5. 绑定成员函数
199 for (int i = 0; i < 3; ++i) {
200 pool.addTask(std::bind(&TaskProcessor::processTask, &processor, i));
201 }
202
203 // 6. 使用submit绑定成员函数
204 for (int i = 0; i < 3; ++i) {
205 pool.submit(&TaskProcessor::processTaskWithMessage, &processor, i + 200, "成员函数");
206 }
207
208 // 等待一段时间让任务执行
209 std::this_thread::sleep_for(std::chrono::seconds(2));
210
211 pool.stop();
212 return 0;
213}
1. 任务类型变化
原版 : 使用抽象基类 Task
和虚函数 process
()
新版 : 使用 std::function<void()>
作为任务类型
2. 灵活性提升
支持多种任务类型 :
普通函数
Lambda表达式
绑定的成员函数
带参数的函数(通过 std::bind
)
函数对象
3. 新增功能
submit
模板方法 : 支持完美转发和自动参数绑定
类型安全 : 编译时类型检查
零开销抽象 : 避免虚函数调用开销
4. 对比
面向对象版本 : 需要继承Task类,重写虚函数
函数对象版本 : 直接使用函数,更加灵活和高效
性能 : 避免虚函数调用,支持内联优化
易用性 : 无需定义新类,直接使用现有函数