#ifndef NET_FRAME_CONCURRENT_QUEUE_H #define NET_FRAME_CONCURRENT_QUEUE_H #include <queue> #include <mutex> #include <condition_variable> template<class Type> /*消息队列实现*/ classConcurrentQueue { ConcurrentQueue& operator=(const ConcurrentQueue&) = delete; ConcurrentQueue(const ConcurrentQueue& other) = delete; public: ConcurrentQueue() : _queue(), _mutex(), _condition() { } virtual ~ConcurrentQueue() { } voidPush(Type record) { std::lock_guard <std::mutex> lock(_mutex); _queue.push(record); _condition.notify_one(); } bool Pop(Type& record, bool isBlocked = true) { if(isBlocked) { std::unique_lock <std::mutex> lock(_mutex); while(_queue.empty()) { _condition.wait(lock); } } else //If user wants to retrieve data in non-blocking mode { std::lock_guard <std::mutex> lock(_mutex); if(_queue.empty()) { return false; } } record =std::move(_queue.front()); _queue.pop(); return true; } int32_t Size() { std::lock_guard <std::mutex> lock(_mutex); return_queue.size(); } boolEmpty() { std::lock_guard <std::mutex> lock(_mutex); return_queue.empty(); } private: std::queue <Type>_queue; mutable std::mutex _mutex; std::condition_variable _condition; }; #endif //NET_FRAME_CONCURRENT_QUEUE_H
(2)拥有消息队列的线程池的实现
.h文件如下
#ifndef NET_FRAME_THREAD_POOL_H #define NET_FRAME_THREAD_POOL_H #include "ConcurrentQueue.h" #include <vector> #include <queue> #include <memory> #include <thread> #include <mutex> #include <condition_variable> #include <future> #include <functional> #include <stdexcept> #define MIN_THREADS 10 template<class Type> classThreadPool { ThreadPool& operator=(const ThreadPool&) = delete; ThreadPool(const ThreadPool& other) = delete; public: ThreadPool(int32_t threads, std::function<void(Type& record)>handler); virtual ~ThreadPool(); voidSubmit(Type record); private: private: bool_shutdown; int32_t _threads; std::function<void(Type& record)>_handler; std::vector <std::thread>_workers; ConcurrentQueue <Type>_tasks; }; template<class Type> ThreadPool<Type>::ThreadPool(int32_t threads, std::function<void(Type &record)>handler) : _shutdown(false), _threads(threads), _handler(handler), _workers(), _tasks() { if (_threads <MIN_THREADS) _threads =MIN_THREADS; for (int32_t i = 0; i < _threads; ++i) _workers.emplace_back( [this] { while (!_shutdown) { Type record; _tasks.Pop(record, true); _handler(record); } } ); } template<class Type> ThreadPool<Type>::~ThreadPool() { for (std::thread &worker: _workers) worker.join(); } template<class Type> void ThreadPool<Type>::Submit(Type record) { _tasks.Push(record); } #endif //NET_FRAME_THREAD_POOL_H