GRPC异步双向流处理的流程伪代码

摘要:
摘抄自stackflow:https://stackoverflow.com/questions/67784384/c-grpc-clientasyncreaderwriter-how-to-check-if-data-is-available-for-read//BaseclassforasyncbidirRPCshandlers.//Thisissothatthehandlingthreadi

摘抄自stackflow:

https://stackoverflow.com/questions/67784384/c-grpc-clientasyncreaderwriter-how-to-check-if-data-is-available-for-read

//Base class for async bidir RPCs handlers. 
//This is so that the handling thread is not associated with a specific rpc method.
classRpcHandler {
  //This will be used as the "tag" argument to the various grpc calls.
  structTagData {
    enum classType {
      start_done,
      read_done,
      write_done,
      //add more as needed...
};

    RpcHandler*handler;
    Type evt;
  };

  structTagSet {
    TagSet(RpcHandler*self)
        : start_done{self, TagData::Type::start_done},
          read_done{self, TagData::Type::read_done},
          write_done{self, TagData::Type::write_done} {}
    TagData start_done;
    TagData read_done;
    TagData write_done;
  };

 public:
  RpcHandler() : tags(this) {}

  virtual ~RpcHandler() = default;

  //The actual tag objects we'll be passing
TagSet tags;

  virtual void on_ready() = 0;
  virtual void on_recv() = 0;
  virtual void on_write_done() = 0;

  static void handling_thread_main(grpc::CompletionQueue*cq) {
    void* raw_tag =nullptr;
    bool ok = false;

    while (cq->Next(&raw_tag, &ok)) {
      TagData* tag = reinterpret_cast<TagData*>(raw_tag);
      if(!ok) {
        //Handle error
}
      else{
        switch (tag->evt) {
          caseTagData::Type::start_done:
            tag->handler->on_ready();
            break;
          caseTagData::Type::read_done:
            tag->handler->on_recv();
            break;
          caseTagData::Type::write_done:
            tag->handler->on_write_done();
            break;
        }
      }
    }
  }
};

void do_something_with_response(Response const&);

class MyHandler final : publicRpcHandler {
 public:
  using responder_ptr =std::unique_ptr<grpc::ClientAsyncReaderWriter<Request, Response>>;

  MyHandler(responder_ptr responder) : responder_(std::move(responder)) {
    //This lock is needed because StartCall() can
    //cause the handler thread to access the object.
    std::lock_guard lock(mutex_);

    responder_->StartCall(&tags.start_done);
  }

  ~MyHandler() {
    //TODO: finish/abort the streaming rpc as appropriate.
}

  void send(const Request&msg) {
    std::lock_guard lock(mutex_);
    if (!sending_) {
      sending_ = true;
      responder_->Write(msg, &tags.write_done);
    } else{
      //TODO: add some form of synchronous wait, or outright failure
      //if the queue starts to get too big.
queued_msgs_.push(msg);
    }
  }

 private:
  //When the rpc is ready, queue the first read
  void on_ready() override{
    std::lock_guard l(mutex_);  //To synchronize with the constructor
    responder_->Read(&incoming_, &tags.read_done);
  };

  //When a message arrives, use it, and start reading the next one
  void on_recv() override{
    //incoming_ never leaves the handling thread, so no need to lock

    //------ If handling is cheap and stays in the handling thread.
do_something_with_response(incoming_);
    responder_->Read(&incoming_, &tags.read_done);

    //------ If responses is expensive or involves another thread.
    //Response msg = std::move(incoming_); 
    //responder_->Read(&incoming_, &tags.read_done); 
    //do_something_with_response(msg);
};

  //When has been sent, send the next one is there is any
  void on_write_done() override{
    std::lock_guard lock(mutex_);
    if (!queued_msgs_.empty()) {
      responder_->Write(queued_msgs_.front(), &tags.write_done);
      queued_msgs_.pop();
    } else{
      sending_ = false;
    }
  };

  responder_ptr responder_;

  //Only ever touched by the handler thread post-construction.
Response incoming_;

  bool sending_ = false;
  std::queue<Request>queued_msgs_;

  std::mutex mutex_;  //grpc might be thread-safe, MyHandler isn't...
};

intmain() {
  //Start the thread as soon as you have a completion queue.
  auto cq = std::make_unique<grpc::CompletionQueue>();
  std::thread t(RpcHandler::handling_thread_main, cq.get());

  //Multiple concurent RPCs sharing the same handling thread:
  MyHandler handler1(serviceA->MethodA(&context, cq.get()));
  MyHandler handler2(serviceA->MethodA(&context, cq.get()));
  MyHandlerB handler3(serviceA->MethodB(&context, cq.get()));
  MyHandlerC handler4(serviceB->MethodC(&context, cq.get()));
}

免责声明:文章转载自《GRPC异步双向流处理的流程伪代码》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇sqlsugarimage创建栅格地图下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

latex之插入伪代码 [转]

常用的排版伪代码包有clrscode, algorithm, algorithmic, algorithmicx, algorithm2e (1)clrscodeclrscode 是著名的算法教材 Introduction to Algorithms, 2nd ed. 的作者排版该书时自己制定的。由于我非常喜欢其排版,及写伪代码的风格是跟着这本书学的,所以...