小天管理 发表于 6月17日 发表于 6月17日 最近在找一个简单的 C++11 线程池实现,发现网上有很多相关的代码,在 CSDN 网上看到一个比较简洁的。但是总感觉是不是实现错了。 Any 类 noncopyable 的,仅仅支持移动语义, Result 类使用了 Any 实例作为成员变量,那么 Result 类应该也是 noncopyable 的, Result SubmitTask(std::shared_ptr<Task> taskPtr);直接使用了复制语义,应该是有问题吧,可是代码能够被 vs2022 正常编译。 threadpool.h #pragma once #include <vector> #include <cstdint> #include <queue> #include <memory> #include <atomic> #include <mutex> #include <thread> #include <condition_variable> #include <functional> #include <sstream> #include <unordered_map> // Any 类型:可以接收任意数据的类型 // 任意其他类型 template // 能让一个类型指向其他类型,基类指针可以指向子类 class Any { public: Any() = default; ~Any() = default; Any(const Any&) = delete; Any& operator=(const Any&) = delete; Any(Any&&) = default; Any& operator=(Any&&) = default; template<typename T> Any(T data) : m_base(std::make_unique<Derive<T>>(data)) {} template<typename T> T cast_() { Derive<T>* pd = dynamic_cast<Derive<T>*>(m_base.get()); if (pd == nullptr) { throw "type is unmath!!"; } return pd->m_data; } private: // 基类 class Base { public: virtual ~Base() = default; }; // 派生类 template<typename T> class Derive : public Base { public: Derive(T data) : m_data(data) {} public: T m_data; }; private: std::unique_ptr<Base> m_base; }; // 实现一个信号量类 class Semaphore { public: Semaphore(int limit = 0) : m_resLimit(limit) {} ~Semaphore() = default; // 获取一个信号量资源 void wait() { std::unique_lock<std::mutex> lock(m_mtx); // 如果没有资源,阻塞线程 while (m_resLimit < 1) { m_cond.wait(lock); } m_resLimit--; } // 增加一个信号量资源 void post() { std::unique_lock<std::mutex> lock(m_mtx); m_resLimit++; m_cond.notify_all(); } private: int m_resLimit; // 资源量 std::mutex m_mtx; std::condition_variable m_cond; }; // Task 类型前置声明 class Task; // 实现接收提交到线程池的 task 任务执行完成后的返回值类型 class Result { public: Result(std::shared_ptr<Task> task, bool isValid = true); ~Result() = default; // setVal void setVal(Any result); // get 方法,用户调用这个方法获取 task 的返回值 Any get(); private: Any m_any; Semaphore m_sem; std::shared_ptr<Task> m_task; std::atomic_bool m_isValid; }; // 任务抽象基类 class Task { public: void exec(); void setResult(Result* res); virtual Any run() = 0; private: Result* m_result{ nullptr }; // 不要用智能指针,task 含有 Result Result 含有 task ,可能导致问题 }; class MyTask : public Task { public: MyTask(int start, int end) : m_start(start), m_end(end) {} Any run() { std::ostringstream ostr; ostr << std::this_thread::get_id(); printf("thead %s, task start \n", ostr.str().c_str()); uint64_t sum = 0; for (int i = m_start; i <= m_end; i++) { sum += i; } printf("sum %llu\n", sum); std::this_thread::sleep_for(std::chrono::seconds(2)); printf("thread %s, task finish \n", ostr.str().c_str()); return sum; } private: int m_start; int m_end; }; enum ThreadPoolMode { MODE_FIXED, // 固定数量的线程 MODE_CACHED, // 线程数量可以动态增长 }; class Thread { public: using ThreadFunc = std::function<void(int)>; Thread(ThreadFunc func); ~Thread(); void Start(); int GetId() { return m_threadId; } private: ThreadFunc m_func; static int generateId; int m_threadId; }; class ThreadPool { public: ThreadPool(); ~ThreadPool(); // 设置线程池工作模式 void SetMode(ThreadPoolMode mode); // 设置任务数量上限 void SetTaskQueMaxThreshold(int value); // 给线程池提交任务 Result SubmitTask(std::shared_ptr<Task> taskPtr); // 开启线程池 void Start(int initThreadSize = std::thread::hardware_concurrency()); private: ThreadPool(const ThreadPool&) = delete; ThreadPool& operator=(const ThreadPool&) = delete; // 定义线程函数 void ThreadFunc(int threadId); bool CheckRunningState() const; private: std::unordered_map<int, std::unique_ptr<Thread>> m_threadMap; // 线程列表 int m_initThreadSize; // 初始的线程数量 std::atomic_int m_curThreadSize; // 当前线程数量 std::queue<std::shared_ptr<Task>> m_taskQue; // 任务队列 std::atomic_int m_taskSize; // 任务的数量 int m_taskQueMaxThreshold; // 任务队列的数量上限 std::mutex m_taskQueMtx; // 保证任务队列的线程安全 std::condition_variable m_taskQueNotFullCv; // 表示任务队列不满 std::condition_variable m_taskQueNotEmptyCv; // 表示任务队列不空 std::condition_variable m_exitCv; // 退出线程池 ThreadPoolMode m_poolMode; // 当前线程池的工作模式 std::atomic_bool m_isPoolRuning; // 当前线程工作状态 }; threadpool.cpp #include "threadpool.h" #include <functional> #include <iostream> constexpr int TASK_MAX_THRESHOLD = 1024; ThreadPool::ThreadPool() : m_initThreadSize(4), m_taskSize(0), m_taskQueMaxThreshold(TASK_MAX_THRESHOLD), m_poolMode(ThreadPoolMode::MODE_FIXED) { } ThreadPool::~ThreadPool() { m_isPoolRuning = false; std::unique_lock<std::mutex> lock(m_taskQueMtx); // 线程 要么在阻塞中 要么在工作中 while (m_threadMap.size() > 0) { m_taskQueNotEmptyCv.notify_all(); // 唤醒等待的工作线程 m_exitCv.wait(lock); } } void ThreadPool::SetMode(ThreadPoolMode mode) { if (m_isPoolRuning) { return; } // 线程池启动后,不允许设置线程池一些参数 m_poolMode = mode; } void ThreadPool::SetTaskQueMaxThreshold(int value) { if (m_isPoolRuning) { return; } m_taskQueMaxThreshold = value; } Result ThreadPool::SubmitTask(std::shared_ptr<Task> taskPtr) { // 获取锁 std::unique_lock<std::mutex> lock(m_taskQueMtx); // 线程通信,检查任务队列是否有空余 while (m_taskQue.size() >= m_taskQueMaxThreshold) { // 用于提交任务,不能阻塞太长时间,如果超过 1s ,给用户返回提交失败 if (m_taskQueNotFullCv.wait_for(lock, std::chrono::seconds(1)) == std::cv_status::timeout) { return Result(taskPtr, false); } } // 如果有空余,把任务提交到任务队列中 m_taskQue.emplace(taskPtr); m_taskSize++; // 因为新放了任务,任务队列肯定不为空了,在 m_taskQueNotEmptyCv 进行通知,赶快分配线程执行这个任务 m_taskQueNotEmptyCv.notify_all(); return Result(taskPtr); } void ThreadPool::Start(int initThreadSize) { m_initThreadSize = initThreadSize; m_curThreadSize = initThreadSize; m_isPoolRuning = true; // 创建线程对象 for (int i = 0; i < m_initThreadSize; i++) { auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::ThreadFunc, this, std::placeholders::_1)); int threadId = ptr->GetId(); m_threadMap.emplace(threadId, std::move(ptr)); } // 启动所有线程 for (auto iter = m_threadMap.cbegin(); iter != m_threadMap.end(); iter++) { iter->second->Start(); } } void ThreadPool::ThreadFunc(int threadId) { while (true) { // 获取锁 std::unique_lock<std::mutex> lock(m_taskQueMtx); std::ostringstream ostr; ostr << std::this_thread::get_id(); printf("thead %s, To Get task \n", ostr.str().c_str()); // 判断任务队列是否为空 while (m_taskQue.empty()) { if (!m_isPoolRuning) { m_threadMap.erase(threadId); m_exitCv.notify_all(); printf("deconstructor thread exit, id = %d\n", threadId); return; } m_taskQueNotEmptyCv.wait(lock); } printf("thead %s, Getted task \n", ostr.str().c_str()); // 不为空,获取任务 auto taskPtr = m_taskQue.front(); // front()返回引用,auto 忽略引用属性,正好满足需要 m_taskQue.pop(); m_taskSize--; lock.unlock(); // 释放锁; // 如果任务队列还有任务,通知其他线程执行任务 if (m_taskQue.size() > 0) { m_taskQueNotEmptyCv.notify_all(); } // 通知队列已经不满 m_taskQueNotFullCv.notify_all(); taskPtr->exec(); if (!m_isPoolRuning) { m_threadMap.erase(threadId); m_exitCv.notify_all(); printf("deconstructor thread exit, id = %d\n", threadId); return; } } } bool ThreadPool::CheckRunningState() const { if (m_isPoolRuning) { return true; } return false; } // 线程方法 int Thread::generateId = 0; Thread::Thread(ThreadFunc func) : m_func(func), m_threadId(generateId++) { } Thread::~Thread() { } void Thread::Start() { std::thread t(m_func, m_threadId); t.detach(); } Result::Result(std::shared_ptr<Task> task, bool isValid) : m_task(task), m_isValid(isValid) { m_task->setResult(this); } void Result::setVal(Any result) { m_any = std::move(result); m_sem.post(); // 通知已经获得结果 } Any Result::get() { if (!m_isValid) { return ""; } m_sem.wait(); // 等待结果 return std::move(m_any); } void Task::exec() { if (m_result != nullptr) { Any result = run(); // 这里发生多态调用 m_result->setVal(std::move(result)); } } void Task::setResult(Result* res) { m_result = res; } main.cpp #include "threadpool.h" #include <chrono> #include <iostream> using std::cout; using std::endl; int main(int argc, char* argv[]) { { ThreadPool pool; pool.Start(4); Result res1 = pool.SubmitTask(std::make_shared<MyTask>(1, 100000000)); Result res2 = pool.SubmitTask(std::make_shared<MyTask>(100000001, 200000000)); Result res3 = pool.SubmitTask(std::make_shared<MyTask>(200000001, 300000000)); //uint64_t sum1 = res1.get().cast_<uint64_t>(); //uint64_t sum2 = res2.get().cast_<uint64_t>(); //uint64_t sum3 = res3.get().cast_<uint64_t>(); //cout << (sum1 + sum2 + sum3) << endl; } cout << "main over" << endl; getchar(); return 0; }
已推荐帖子