Server Streaming with gRPC Header

Server Streaming with gRPC and .NET Core

In this post, I want to focus on the implementation of server streaming and client consumption of the stream when using gRPC with .NET Core. The samples in this post are based on versions you can find in my complete demo application up on my gRPC Demos GitHub repository.

As a bonus, we’ll make use of some newer C# 8 language features in our code!

This post is part of my gRPC and ASP.NET Core series.

What is Server Streaming in gRPC?

Server streaming in gRPC is one of the four types of service method we can define. In this scenario, the client sends a request to the server and receives a stream of messages in return. The number of messages which will be streamed is determined by the server. Typically, a client reads all of the messages from the stream, handling them as soon as they arrive. gRPC is designed so that it can guarantee that the client receives the messages in the same order as they are sent from the server.

Where Would Server Streaming RPC be Useful?

Streaming is useful in any scenario where the server needs to send multiple items in response to a client call. One option that would be typical in a traditional REST approach would be to include an array of items in the serialised model. In this case, all items would need to be generated/loaded before the request can be returned. One reason, you may do this is to reduce the number of requests required to gather the data, instead, bundling them into a single response.

With server streaming in gRPC, the server sends messages as soon as it has them available. If each message requires some load or generation time, the server can reduce the time to first message by streaming data when the first message can be ptoduced. We can even produce messages in parallel. The client can begin to act on the message immediately, upon receiving it on the stream.

When streaming, each item sent as a message will usually be some distinct object that can be handled in isolation. This way, the client can work with each message as they are received. That doesn’t always have to be the case, but it’s where the best performance gains will be seen.

When streaming, a single gRPC channel can be used which avoids repeated connections being made to the server. This helps to reduce the load on the clients and servers.

Defining a gRPC Server Streaming Service Method

We begin by defining an RPC method on our service in the proto file. A complete file can be found as part of the gRPC Demos repo. In that sample, three service methods exist, of which, the second is a server streaming method.
 
Here’s a simplified version of that proto file.

This file declares the service definition using protocol buffers.
 
We’ll focus on line 10 for now, which defines an RPC method for our WeatherForecasts service.

This method, called GetWeatherStream, accepts an Empty request message and returns a ‘stream WeatherData’. By including the stream keyword before the message type, we identify that the response is a stream and not a single message.

Implementing the Server

Our server application includes a service reference to the proto file.

This reference identifies that the server code should be auto-generated for this application, based upon the proto file service definition. Upon building our application, the gRPC tooling generates that code, and within the application, we will have an abstract class called WeatherForecastsBase included within our specified namespace of WeatherForecast.

Our implementation will derive from that base class and override the appropriate method, providing the server-side implementation for the service.

The WeatherService can accept injected dependencies from the ASP.NET Core dependency injection container. In this example, we inject an ILogger instance.

We override the generated GetWeatherStream method from the base class. The method signature accepts an Empty message, an IServerStreamWriter<WeatherData> and a ServerCallContext.

We can discard the Empty message since there are no fields on it. We’ll use the IServerStreamWriter in a few moments, to write messages to the stream. The ServerCallContext provides access to data about the current call. It’s similar to the HttpContext we have access to on regular HTTP requests.

The return type for this method is a Task.

We can now implement this method:

We begin by establishing three local variables for a Random instance, the current UTC DateTime and an integer counter ‘i’.

The bulk of the logic exists within the while loop, which executes until either of two conditions is true. The second test checks whether ‘i’ is less than 20. If not, we break the loop. The first condition examines the CancellationToken on the ServerCallContext.

This token represents the state of the call. If the client were to cancel their request, signalling that they no longer plan to read from the stream, there is no value in the server continuing to produce response messages. Client cancellation can be tracked on the server using this CancellationToken. A client cancellation will also cause the while loop to break.

Within the while loop, we simulate the generation of some weather forecasts, represented by the auto-generated type WeatherData. This type has been generated by the gRPC tooling based on the message definition in our proto file.

Before generating each forecast, we use a Task.Delay simulating the effort of loading/generating the forecast. In this case, each forecast takes 500ms to create.

We then create a new WeatherData instance with some random data set against its properties.

We use the injected logger to log the fact that a response is about to be sent and finally, we can call WriteAsync on the IServerStreamWriter, passing in the generated WeatherData instance.

The gRPC libraries and generated code will now write that message onto the response stream, and it will travel over the gRPC channel to the client. Our loop will write up to 20 messages onto the stream unless the client application cancels its call.

That concludes our basic server implementation for the gRPC service. Most of the implementation code exists in the gRPC libraries, so our work is limited to producing WeatherData objects and writing them to the stream.

The final thing to do is to ensure that the service is registered with ASP.NET Core.

This code is the bare minimum we need for this server to function. In ConfigureServices, we register the gRPC services with the dependency injection container using the AddGrpc extension method on IServiceCollection.

In Configure, we ensure an endpoint is mapped to the gRPC service using MapGrpcService. We pass our WeatherService type as the generic argument.

At this point, we can run the gRPC server using ‘dotnet run’ from a command line.

Again, for a slightly more complete server sample, my gRPC Demos repo can be cloned.

Implementing the Client

We can use a basic .NET Core console app to act as our client for this example.

We must add some NuGet package references and a gRPC service reference to our project. The quickest way is to modify the project file directly, but you can use the Visual Studio tooling or the .NET CLI to achieve this. After adding the required references, our project file for the client looks like this:

The various packages include the gRPC tooling, which triggers the compilation of the proto file and code generation. We also reference the Google.Protobuf package which supports the serialisation/deserialisation of the messages. Grpc.Net.Client supports making a client connection using gRPC.
 
We also have a service reference to the same proto file as used to define the server. This time we ask for the client code generation to occur.
 
In the Program class, we can make a simple gRPC call and work with the stream of messages from the server.

First, we establish a gRPC channel. We can use the ForAddress method to do this, passing the correct address for our local server instance. In my case, the port for the server is 5005, but you may need to adjust this for your environment.

We then create a client, which is an instance of WeatherForeCastsClient. This is automatically code-generated by the tooling from the service definition in the proto file. We must pass a channel to its constructor, which the client instance will use when making gRPC calls.

Our code establishes a CancellationTokenSource which we’ll use to simulate early client cancellation of a gRPC call. Our token source is set up so that tokens will cancel after 5 seconds.

We can now call methods on the client instance. While these look like regular method calls, they are in fact executing remotely on our server. The code-gen for the client means that we don’t need to write any of that boilerplate code to issue the request, handle the response and deserialise the content ourselves.

The method signature for GetWeatherStream requires an Empty message, so we create an instance of that. We also pass a CancellationToken, from the CancellationTokenSource.

This method returns an AsyncServerStreamingCall object, which represents a call that includes a stream of responses. This type is disposable, so we can ensure it is disposed of with a new C#8 feature, using declarations. Rather than a traditional using block, we can avoid a layer of indentation, and disposal will occur automatically, when the method exits.

Inside the Try block, we access the ResponseStream on the AsyncServerStreamingCall. This exposes a method called ReadAllAsync. This returns a new .NET Core 3.0 / C#8 feature, an IAsyncEnumerable. Using the new C#8 await foreach, async stream syntax we can asynchronously enumerate the responses as they are received, all in a non-blocking way. The loop will awaken as messages are received. I really like async streams, and this is a great use for them.

In the Catch block, we handle the RpcException if one is thrown. The server will throw one in the case of cancellation and this exception is signalled over the gRPC channel. We can catch that on the client and in this case, handle the cancellation status code by logging a message. 

With this code, we have implemented a client. You’ll hopefully note that we didn’t deal with deserialisation directly, nor the implementation details of making an HTTP/2 request. The gRPC libraries and autogenerated code have dealt with that for us.

When running the client, you should see a series of forecasts appear in the console as the response messages are received. A delay occurs between each one as we simulated that delay on the server.

Summary

We’ve covered a fair amount of code in this post. Hopefully, you can now see how server streaming can be defined in the service contract. That contract can then be used to generate the server stub and the client C# code. A reasonably simple server implementation was added, supporting client-side cancellation scenarios. We also made use of some new C# 8 language features like async streams and using declarations to simplify the code that we wrote.

For more gRPC content, you can find all of my posts that are part of my gRPC and ASP.NET Core series.


Have you enjoyed this post and found it useful? If so, please consider supporting me:

Buy me a coffeeBuy me a coffee Donate with PayPal

Steve Gordon

Steve Gordon is a Pluralsight author, 6x Microsoft MVP, and a .NET engineer at Elastic where he maintains the .NET APM agent and related libraries. Steve is passionate about community and all things .NET related, having worked with ASP.NET for over 21 years. Steve enjoys sharing his knowledge through his blog, in videos and by presenting talks at user groups and conferences. Steve is excited to participate in the active .NET community and founded .NET South East, a .NET Meetup group based in Brighton. He enjoys contributing to and maintaining OSS projects. You can find Steve on most social media platforms as @stevejgordon

Leave a Reply

Your email address will not be published. Required fields are marked *