摘要:
摘抄自stackflow:https://stackoverflow.com/questions/67784384/c-grpc-clientasyncreaderwriter-how-to-check-if-data-is-available-for-read//BaseclassforasyncbidirRPCshandlers.//Thisissothatthehandlingthreadi
摘抄自stackflow:
https://stackoverflow.com/questions/67784384/c-grpc-clientasyncreaderwriter-how-to-check-if-data-is-available-for-read
//Base class for async bidir RPCs handlers. //This is so that the handling thread is not associated with a specific rpc method. classRpcHandler { //This will be used as the "tag" argument to the various grpc calls. structTagData { enum classType { start_done, read_done, write_done, //add more as needed... }; RpcHandler*handler; Type evt; }; structTagSet { TagSet(RpcHandler*self) : start_done{self, TagData::Type::start_done}, read_done{self, TagData::Type::read_done}, write_done{self, TagData::Type::write_done} {} TagData start_done; TagData read_done; TagData write_done; }; public: RpcHandler() : tags(this) {} virtual ~RpcHandler() = default; //The actual tag objects we'll be passing TagSet tags; virtual void on_ready() = 0; virtual void on_recv() = 0; virtual void on_write_done() = 0; static void handling_thread_main(grpc::CompletionQueue*cq) { void* raw_tag =nullptr; bool ok = false; while (cq->Next(&raw_tag, &ok)) { TagData* tag = reinterpret_cast<TagData*>(raw_tag); if(!ok) { //Handle error } else{ switch (tag->evt) { caseTagData::Type::start_done: tag->handler->on_ready(); break; caseTagData::Type::read_done: tag->handler->on_recv(); break; caseTagData::Type::write_done: tag->handler->on_write_done(); break; } } } } }; void do_something_with_response(Response const&); class MyHandler final : publicRpcHandler { public: using responder_ptr =std::unique_ptr<grpc::ClientAsyncReaderWriter<Request, Response>>; MyHandler(responder_ptr responder) : responder_(std::move(responder)) { //This lock is needed because StartCall() can //cause the handler thread to access the object. std::lock_guard lock(mutex_); responder_->StartCall(&tags.start_done); } ~MyHandler() { //TODO: finish/abort the streaming rpc as appropriate. } void send(const Request&msg) { std::lock_guard lock(mutex_); if (!sending_) { sending_ = true; responder_->Write(msg, &tags.write_done); } else{ //TODO: add some form of synchronous wait, or outright failure //if the queue starts to get too big. queued_msgs_.push(msg); } } private: //When the rpc is ready, queue the first read void on_ready() override{ std::lock_guard l(mutex_); //To synchronize with the constructor responder_->Read(&incoming_, &tags.read_done); }; //When a message arrives, use it, and start reading the next one void on_recv() override{ //incoming_ never leaves the handling thread, so no need to lock //------ If handling is cheap and stays in the handling thread. do_something_with_response(incoming_); responder_->Read(&incoming_, &tags.read_done); //------ If responses is expensive or involves another thread. //Response msg = std::move(incoming_); //responder_->Read(&incoming_, &tags.read_done); //do_something_with_response(msg); }; //When has been sent, send the next one is there is any void on_write_done() override{ std::lock_guard lock(mutex_); if (!queued_msgs_.empty()) { responder_->Write(queued_msgs_.front(), &tags.write_done); queued_msgs_.pop(); } else{ sending_ = false; } }; responder_ptr responder_; //Only ever touched by the handler thread post-construction. Response incoming_; bool sending_ = false; std::queue<Request>queued_msgs_; std::mutex mutex_; //grpc might be thread-safe, MyHandler isn't... }; intmain() { //Start the thread as soon as you have a completion queue. auto cq = std::make_unique<grpc::CompletionQueue>(); std::thread t(RpcHandler::handling_thread_main, cq.get()); //Multiple concurent RPCs sharing the same handling thread: MyHandler handler1(serviceA->MethodA(&context, cq.get())); MyHandler handler2(serviceA->MethodA(&context, cq.get())); MyHandlerB handler3(serviceA->MethodB(&context, cq.get())); MyHandlerC handler4(serviceB->MethodC(&context, cq.get())); }