In the previous post in this mini-series, we learned how items are written to an UnboundedChannel<T>. We explored the UnboundedChannel<T> type itself in the first blog post. Today I will complete the series and focus on how items are read from an UnboundedChannel using its UnboundedChannelReader.
Other Posts in Series
NOTE: This post has been written while .NET 5 was in the release candidate phase. I don’t expect many code changes to channels in the immediate future, but be aware, since we are studying internal implementation details, some details in this post may become outdated. You may want to check the source on GitHub for the most current code.
Just as we saw when plunging into the writing end of the Channel, the UnboundedChannelReader is a private nested class of UnboundedChannel<T>. A new instance is created and assigned to the Reader property of the Channel during initialisation.
UnboundedChannelReader derives from the ChannelReader<T> abstract base class.
When the reader is constructed, it creates and stores two AsyncOperation singletons which are pooled and can be used under certain circumstances to avoid allocating new AsyncOperations.
These represent an operation which returns an item of type T, and an operation returning a bool. These are used by ReadAsync and WaitToReadAsync respectively. We touched on the AsyncOperation type in the previous post. It implements IValueTaskSource and IValueTaskSource<TResult> so can be pooled to avoid extra allocations, even when code is executing asynchronously.
There are several consumer patterns that one can use to read from a channel. Each may fit different scenarios for consuming applications. I won’t go into those specifics here. The original readme for Channels provides a good review of these patterns, as does Stephen Toub’s blog post “Introducing System.Threading.Channels”.
We’ll step through the code in this method, piece by piece. To view the full current code you can view the UnboundedChannel source on GitHub.
The ReadAsync method first checks if the CancellationToken parameter has been marked as cancellation requested. If so, it returns immediately with a cancelled task.
Next, it tries to Dequeue an item from the items collection (a ConcurrentQueue) on the parent UnboundedChannel<T>. The ConcurrentQueue, used as the backing store for the UnboundedChannel, is already thread-safe so this check need not occur inside any additional locking or thread synchronisation.
If an item is retrieved from the ConcurrentQueue, it will be returned as the result of the ValueTask<T>. This is synchronous, which is one of the reasons ValueTask is a better fit here than a more common Task. ValueTask<T> can avoid the allocation of a Task, in this case, returning the value directly to the caller.
Before returning the item, a call is made to CompleteIfDone, a private method of the UnboundedChannelReader.
In the previous post, we spent a little time focusing on TryComplete; a method on the UnboundedChannelWriter. TryComplete is used to signal that writing to the Channel has ended. In cases where there are no remaining items in the items collection, it can mark the Channel as fully completed. However, if there are items still in the ConcurrentQueue, those must be allowed to be drained by any consumers.
The CompleteIfDone method above is where this check occurs. After an item has been read, this method will check whether the parent UnboundedChannel _doneWriting field is not null. In that case, no further writes will occur. The second part of the condition then checks if the items collection contains any additional items. If not, then all consumers may complete since no further items will ever be read. When these conditions are met the ChannelUtilities.Complete method is used to mark the TaskCompletionSource from the Channel as done. This may include marking it as cancelled, as having caused an exception, or with a default VoidResult.
If the ConcurrentQueue of items is empty, the code enters a lock over the SyncObj on the parent Channel.
The code attempts to dequeue an item one more time which may result in returning that item. Remember that the producer is likely writing items, so this collection may have received an item before we obtained the lock. If nothing is present in the collection, then nothing new will be added at this point, until the reader releases the lock.
The next conditional (still inside the lock) is intended to check if the channel _doneWriting field is set. If so, then no more writes will occur. In this case, since we know there is nothing in the items collection, and nothing more can be written, so GetInvalidCompletionValueTask will cause a ChannelClosedException to be thrown. Consumers are expected to handle this since it may occur at any point when reading the from the Channel.
The next block of code attempts to make use of the singleton reader instance if possible. This can only occur when the CancellationToken passed to the ReadAsync method cannot be cancelled. This may be the case when a default (CancellationToken.None) token is provided by consumers which call ReadAsync() with no arguments.
In that particular case, the UnboundedChannelReader is optimised to try to avoid allocating a new AsyncOperation<T> by using a pooled instance. This will always be possible if there is only a single consumer processing items from the Channel, for example. After accessing the singleton pooled instance, TryOwnAndReset is called to attempt to take ownership of the instance and reset its state.
Assuming ownership is achieved, the singleton reader is enqueued to the tail of the _blockedReaders DeQue (double-ended queue). The ValueTaskOfT property on the singleton AsyncOperation<T> is then accessed and returned. This returns a ValueTask<TResult> which is backed by the AsyncOperation.
In cases where the singleton reader cannot be used (the CancellationToken may be cancelled) or ownership of the singleton reader was not obtained, a new AsyncOperation<T> is created as the reader. This is then enqueued to the tail of _blockedReaders, and it’s ValueTask returned.
Enqueued blockedReaders will be accessed during the write operation (as we saw in the previous post) and handed an item directly when available.
This simple synchronous method supports an attempt to read a queued item from the Channel if one is available. It will return immediately.
It attempts to dequeue an item from the _items ConcurrentQueue. If an item is successfully dequeued, it is used to set the out parameter (item). CompleteIfDone is called, and as we learned from the ReadAsync code, it may mark the Channel complete if writing is complete and no further items are queued. Finally, the method returns true to the caller, indicating that an item was read.
If no queued items exit, the default item is set on the out parameter and false is returned.
WaitToReadAsync can be used by consumers to be notified asynchronously when there is data available to be read in the Channel. It returns a ValueTask<bool> which will complete with true when items are available, or false if no items will ever be available. We’ll again step through the code in small chunks, but the full code is available on GitHub.
Outside of any locking, two conditional code blocks may return a value. The first, as with ReadAsync will return a cancelled Task if the provided CancellationToken is already cancelled.
The second will check if the _items ConcurrentQueue is not empty. In that case, the method can return synchronously with a true value.
At this point, the code needs to lock over the parent SyncObj. After locking, it performs a second check to see if any items have potentially be added to the ConcurrentCollection between the first check and obtaining the lock. If so, it can return true.
The next condition checks if _doneWriting is not null, in which case, nothing new will be written. Since we are now in a lock and have checks that there are no items after locking, we can safely assert that we’ll never have anything to read. The value of the Exception referenced by _doneWriting, will dictate what needs to be returned here. If the Exception is not equal to the special s_doneWritingSentinel, used when writing is completed under normal circumstances, then a Task from the Exception is returned. Otherwise, the default value, false, is returned.
The next section of code is very similar to the code from ReadAsync. In cases where we know that cancellation cannot happen, it attempts to take ownership of a single waiter and if owned, queues the waiter to _waitingReadersTail. _waitingReadersTail is a field holding an AsyncOperation<bool> which may be null. An AsyncOperation may function as a kind of linked list of operations by holding a reference to the Next operation. ChannelUtilities.QueueWaiter is a helper method which sets the _waitingReadersTail by updating the chain of operations as necessary. If there is no current operation, then the singleton operation becomes the first entry. At this point, the ValueTaskOfT from the waiter operation is returned.
If the singleton waiter could not be used, then a new AsyncOperation<bool> is created and queued to the _waitingReadersTail before returning its ValueTaskOfT.
During writing, any queued waiters will be woken by setting their result to true when items are queued. Consumers must account for the fact that this code introduces a benign race condition, where the data may have been read by the time the continuation, where WaitToReadAsync was awaited, attempts to read an item.
The final method we’ll focus on in this post is only available in frameworks/libraries which support .NET standard 2.1. This adds IAsyncEnumerable support to the base ChannelReader<T>. This is therefore inherited by the UnboundedChannelReader.
This is pretty straightforward code which uses a nested while loop pattern. In the outer while loop, the code awaits WaitToReadAsync which may or may not complete asynchronously. When it completes, if the value is true, items are available to be read, and it then starts the inner while loop, using the synchronous TryRead method to return items until there are no longer any to return. At that point, TryRead returns false and the inner while loop exits. The outer while loop will then asynchronously await more items with WaitToReadAsync. That method will only return false if the channel is completed and no other items will be written, or an Exception has been thrown somewhere.
In this post, we explored the internals of the UnboundedChannelWriter methods. We learnt how the code is optimised for minimal locking and allocations to make it fast and efficient.
While I appreciate these internal details are not of interest for everyone, I find it very useful to learn how this fairly complex code works. It provides me with the knowledge to better use the types and hopefully become a better developer. I hope this series has been of interest and use to some and if you’ve made it this far, well done! Let me know if you enjoyed the series and would like to see more of these internal implementation posts by commenting below.