Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to read async server side streaming using GRPC c++

I am trying to implement asynchronous server-side streaming using C++. but I am not able to find any good example for the same. I am having a hard time reading stream asynchronously.

Server Code

class ServerImpl final
{
public:
    ~ServerImpl()
    {
        server_->Shutdown();
        cq_->Shutdown();
    }

    void Run()
    {
        std::string server_address("0.0.0.0:50051");

        ServerBuilder builder;
        builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
        builder.RegisterService(&service_);

        cq_ = builder.AddCompletionQueue();
        server_ = builder.BuildAndStart();
        std::cout << "Server listening on " << server_address << std::endl;

        HandleRpcs();
    }

private:
    class CallData
    {
    public:
        CallData(MultiGreeter::AsyncService* service, ServerCompletionQueue* cq)
            : service_(service)
            , cq_(cq)
            , responder_(&ctx_)
            , status_(CREATE)
            , times_(0)
        {
            Proceed();
        }
      void Proceed()
        {
            if (status_ == CREATE)
            {
                status_ = PROCESS;
                service_->RequestsayHello(&ctx_, &request_, &responder_, cq_, cq_, this);
            }
            else if (status_ == PROCESS)
            {                   
                if (times_ == 0)
                {
                    new CallData(service_, cq_);
                }    
                if (times_++ >= 3)
                {
                    status_ = FINISH;
                    responder_.Finish(Status::OK, this);
                }
                else
                {
                    std::string prefix("Hello ");
                    reply_.set_message(prefix + request_.name() + ", no " + request_.num_greetings());    
                    responder_.Write(reply_, this);
                }
            }
            else
            {
                GPR_ASSERT(status_ == FINISH);
                delete this;
            }
        }    
    private:
        MultiGreeter::AsyncService* service_;
        ServerCompletionQueue* cq_;
        ServerContext ctx_;  
        HelloRequest request_;
        HelloReply reply_; 
        ServerAsyncWriter<HelloReply> responder_;    
        int times_;   
        enum CallStatus
        {
            CREATE,
            PROCESS,
            FINISH
        };
        CallStatus status_;
    };

    void HandleRpcs()
    {
        new CallData(&service_, cq_.get());
        void* tag; 
        bool ok;
        while (true)
        {
            GPR_ASSERT(cq_->Next(&tag, &ok));
            GPR_ASSERT(ok);
            static_cast<CallData*>(tag)->Proceed();
        }
    }
    std::unique_ptr<ServerCompletionQueue> cq_;
    MultiGreeter::AsyncService service_;
    std::unique_ptr<Server> server_;
};

client code

class GreeterClient
{
public:
    GreeterClient(std::shared_ptr<Channel> channel)
        : stub_(MultiGreeter::NewStub(channel))
    {}
    void SayHello(const std::string& user, const std::string& num_greetings)
    {
        HelloRequest request;
        request.set_name(user);
        request.set_num_greetings(num_greetings);
        ClientContext context;
        CompletionQueue cq;
        void* got_tag = (void*)1;
        bool ok = false;
        std::unique_ptr<ClientAsyncReader<HelloReply>>       reader(stub_>PrepareAsyncSayHello(&context,request, &cq));    
        std::cout << "Got reply: " << reply.message() << std::endl;          
        reader->Read(&reply,got_tag);
        Status status;
        reader->Finish(&status, (void*)1);
        GPR_ASSERT(cq.Next(&got_tag, &ok));
        GPR_ASSERT(ok);   
        if (status.ok()) 
        {
            std::cout << "sayHello rpc succeeded." << std::endl;
        } 
        else 
        {
            std::cout << "sayHello rpc failed." << std::endl;
            std::cout << status.error_code() << ": " << status.error_message() << std::endl;
        }
    }
private:
    std::unique_ptr<MultiGreeter::Stub> stub_;
};

My proto service

rpc SayHello (HelloRequest) returns (stream HelloReply) {}

when I am reading the stream Synchronous manner using ClientReader it is working fine. but when i am using ClientAsyncReader I am getting this runtime error.

Assertion failed: (started_), function Finish, file /usr/local/include/grpcpp/impl/codegen/async_stream_impl.h, line 250.
23:25:40: The program has unexpectedly finished.

Thanks in advance.

like image 247
laxmi Avatar asked Jan 30 '26 06:01

laxmi


1 Answers

The error message Assertion failed: (started_), function Finish, points to an issue where the Finish function of ClientAsyncReader is being invoked prior to initializing an asynchronous read operation with Read. This disrupts the intended order of asynchronous operations.

The problem is that the Finish function is being used before the Read operation is done. When you use ClientAsyncReader, you should first do the Read, wait for it to finish, and then do the Finish.

In your case try something like this:

...
reader->Read(&reply, got_tag);

// Wait for the read operation to complete
void* got_tag_read;
bool ok_read;
GPR_ASSERT(cq.Next(&got_tag_read, &ok_read));
GPR_ASSERT(ok_read);

Status status;
reader->Finish(&status, (void*)1);

// Wait for the Finish operation to complete
void* got_tag_finish;
bool ok_finish;
GPR_ASSERT(cq.Next(&got_tag_finish, &ok_finish));
GPR_ASSERT(ok_finish);
...
like image 177
Anton Kesy Avatar answered Feb 01 '26 21:02

Anton Kesy



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!