In a previous post, I introduced System.Threading.Channels and explained how it can be used. At a high-level, it provides a modern, optimised asynchronous API for in-process publisher/subscriber patterns. Since that post, Stephen Toub, Partner Software Engineer at Microsoft, has published an in-depth blog post which shows how the feature is designed and covers common consumption patterns. If you’re simply looking to make use of Channels from application code, those two posts are the right places to start.
In this post, we will start going a bit deeper and study the internal implementation details for UnboundedChannel<T>. While this level of knowledge is not necessary to use the Channels feature, I find it interesting and useful to seek to understand at least one layer of abstraction below my own code. I’ve previously written about this in my post “Become a better developer by reading source code“. This post will be the first of three, which explore UnboundedChannel<T>. Today, we’ll focus on the class inheritance and how an UnboundedChannel is created.
Other Posts in Series
- Part 1 – This post
- Part 2 – UnboundedChannelWriter
- Part 3 – UnboundedChannelReader
NOTE: This post has been written while .NET 5 was in the release candidate phase. I don’t expect many code changes to Channels in the immediate future, but be aware, since we are studying internal implementation details, some details in this post may become outdated. You may want to check the source on GitHub for the most current code.
UnboundedChannel<T> Inheritance
The UnboundedChannel<T> type derives from the abstract base Channel<T> class, which itself derives from the abstract Channel<TWrite, TRead> class. The first interesting observation here is that the Channel abstract type supports the concept of different types for writing and reading. This means that in theory, a channel implementation could include the transformation from a source type, to a destination. If you’ve used the TPL Dataflow library, you can see a parallel to the TransformBlock<TInput,TOutput>. Currently, there are no framework implementations which perform a transform, but it’s feasible to create one for your own specific requirements. The Channel<T> abstract class provides an abstraction on top of Channel<TWrite, TRead> for writing and reading a single type. Channel<T> is the base class used by framework channel implementations.
The base abstract class includes the following four members:
public ChannelReader<TRead> Reader { get; protected set; } = null!;
// derived types should always set the Reader as part of construction
This property allows consumers to access the channel reader when consuming from the channel. As the comment states, this is expected to be set from the derived channel implementation, with an appropriate ChannelReader<T> implementation.
public ChannelWriter<TWrite> Writer { get; protected set; } = null!; // derived types should always set the Writer as part of construction
This property allows producers to access the channel writer, used to write new items into the channel. Again, this must be set by the derived type.
The base type also includes two implicit operators:
public static implicit operator ChannelReader<TRead>(Channel<TWrite, TRead> channel) => channel.Reader;
public static implicit operator ChannelWriter<TWrite>(Channel<TWrite, TRead> channel) => channel.Writer;
These support implicit casting from the channel to either it’s readable or writeable half. For example, a consumer of the Channel library can write the following code to access the ChannelWriter<T> by implicitly casting from the UnboundedChannel<T>.
var channel = Channel.CreateUnbounded<string>();
ChannelWriter<string> writer = channel;
A more common scenario that this enables is allowing a UnboundedChannel<T> to be passed into a method accepting a ChannelWriter<T> parameter.
That’s it for the base abstraction, so let’s move onto explore one of the default framework implementations, UnboundedChannel<T>.
UnboundedChannel<T> Members
We need not address all members of the type here since many will come up as we explore the code. The main member I want to highlight here is the private _items field.
private readonly ConcurrentQueue<T> _items = new ConcurrentQueue<T>();
This is one of the most important members since this is where items written by producers may be stored until consumers read them. The reason I say “may be stored”, is that the Channel is optimised to avoid this if possible. We’ll understand how that works in the next post when we look at the ChannelWriter.
Different channel implementations may use various underlying collections. Here, the concurrent queue is used, which provides a thread-safe First-In-First-Out queue data structure.
Creating an UnboundedChannel
A logical place to start is to focus on how an instance of an UnboundedChannel can be created. Let’s look at its constructor.
A private field is set with the value of the single parameter, indicating whether continuations should run asynchronously (more on that coming up).
Next, a TaskCompletionSource is created and stored into a _completion field which will be used to coordinate completion of the Channel. This is created with any appropriate TaskCreationOptions flags added. When the runContinuationsAsynchronously parameter is true, the TaskCreationOptions.RunContinuationsAsynchronously flag is set, otherwise None is set.
Finally, a UnboundedChannelReader and UnboundedChannelWriter are created and set against corresponding properties on the abstract base class. You’ll recall that these were null on the abstract class and were expected to be set by the derived type.
A final important thing to highlight is that this constructor, the only constructor on this type, is marked internal. Therefore, we can’t access this constructor from our application code, outside of this assembly. How do we create an UnboundedChannel?
To create an instance of this type, we must use a method from the static Channel class, of which there are two overloads.
CreateUnbounded<T>()
CreateUnbounded<T>(UnboundedChannelOptions options)
The first, parameterless method creates a new UnboundedChannel<T>, passing true as the value for the runContinuationsAsynchronously argument.
new UnboundedChannel<T>(runContinuationsAsynchronously: true)
When set to true, this value controls how some of the internal Tasks are treated within the internal implementation. The default value here intends to avoid inlining of continuations such that they are queued onto the thread pool (or onto the originating synchronisation context).
The second CreateUnbounded overload accepts UnboundedChannelOptions and uses this to configure the unbounded Channel.
As a side note; UnboundedChannelOptions derives from the base ChannelOptions type and adds no further members. The base ChannelOptions includes three public properties which can be configured.
- AllowSynchronousContinuations can be set to true if operations performed on a channel may synchronously invoke continuations.
- SingleWriter can be set to true in cases where we can guarantee only a single producer will be writing to the Channel.
- SingleReader can be used similarly when we can ensure only a single consumer will read from the Channel.
Internally, the Channel implementations can make some optimisations when a single reader or writer can be guaranteed, since certain thread-safety assumptions can be made and some locking possibly avoided.
Okay, back to the CreateUnbounded(UnboundedChannelOptions options) method implementation.
This code reveals an interesting implementation detail. In the case where a single consumer is indicated by the options, a specialised channel implementation, SingleConsumerUnboundedChannel, is returned. We won’t dive into that for this post, but I may revisit it in the future.
For now, let’s assume we don’t have a single reader scenario, and therefore, the code above creates a new UnboundedChannel<T>.
Summary
In this first post, we’ve started to explore the code behind the UnboundedChannel<T>, one of three Channel <T> implementations which are part of the library and included in newer .NET versions. We’ve touched on the inheritance chain for the type and focused on how an instance is instantiated.
Join me in part two, to learn about writing to an UnboundChannel<T> using the UnboundedChannelWriter.
Have you enjoyed this post and found it useful? If so, please consider supporting me: