In part 1 of this mini-series, we began to explore the internals of the UnboundedChannel<T> type, learning about its class hierarchy and how an instance can be instantiated. In this post, we’ll continue our journey and focus on how items are written to an UnboundedChannel.
Other Posts in Series
NOTE: This post has been written while .NET 5 was in the release candidate phase. I don’t expect many code changes to channels in the immediate future, but be aware, since we are studying internal implementation details, some details in this post may become outdated. You may want to check the source on GitHub for the most current code.
Writing to the UnboundedChannel<T> using UnboundedChannelWriter
Now that we have an instance of a channel (see part 1), we can allow a producer, or producers, to write to it.
When the UnboundedChannel was created, an instance of UnboundedChannelWriter was also created and used to set the Writer property on the base Channel class. UnboundedChannelWriter is defined as a nested private class within the UnboundedChannel. It overrides and implements methods from the abstract ChannelWriter<T> class from which it derives.
The UnboundedChannelWriter is used to write to the Channel. Several producer patterns are supported via the three write focused methods, TryWrite, WaitToWriteAsync and WriteAsync.
The majority of the channel writing code is within the TryWrite method, which attempts to write an item to the Channel, returning true when writing succeeds. As this Channel is unbounded, writing is relatively straightforward as there is no need to prevent writes in most cases. You can view the full and current code in the dotnet/runtime repository on GitHub. I’ll include relevant portions of the code as we discuss them.
TryWrite includes a while(true) loop which will run until either the item is successfully written or fails. In most cases, one or two iterations should be enough to complete the write attempt for the item.
Channels are optimised to avoid synchronisation overhead but cannot operate without some locking to ensure thread-safety. TryWrite begins by obtaining a lock over the _items object from the parent, UnboundedChannel<T>, accessed through the SyncObj field.
Inside the lock, the first conditional check checks to see if the Channel has already been marked as completed for writing, in which case, it’s not valid to accept any additional items. This is the only case where the method returns false, and nothing is written.
The way that the UnboundedChannel tracks whether writing is complete is worth a mention here. Inside the Channel, a _doneWriting field is used. This field may hold a reference to an exception. The absence of an exception, i.e., the field holds a null reference, indicates that the Channel is active and available for writing. If the _doneWriting field holds a reference to an exception, the Channel is completed, either successfully or through some failure. A special Exception type is used for the success case, which is noteworthy as it’s an uncommon use for an Exception. We’ll look at completion in more detail when we cover the TryComplete method.
The next conditional checks to see if there are any blocked readers. A blocked reader occurs when a consumer of the Channel is awaiting the ReadAsync Task on a ChannelReader, where there are no currently queued items. It is now waiting asynchronously to continue once the next item becomes available. The name here implies that a thread could be blocked, but fear not, Channels fully supports asynchronous usage.
Blocked readers are tracked in the parent UnboundedChannel<T> in the _blockedReaders field. This field holds a reference to a Deque<AsyncOperation<T>>. Both of these types are internal to the Channels assembly and support functionality upon which Channels are built.
Deque represents a specialised collection, providing a double-ended queue data structure. The core feature of this structure supports adding and removing items from either the head or the tail. This has the properties of both a stack (LIFO) and a queue (FIFO). Today, since this is internal to Channels its not something we can use elsewhere. An old GitHub issue is open to consider adding this data structure into the main framework.
AsyncOperation<T> is used to represent the specifics of an asynchronous operation that has a result value. What’s special about this type is that it implements IValueTaskSource and IValueTaskSource<TResult> to support reduced allocations when awaiting operations on the Channel. We’re starting to get into some pretty deep and complex territory at this point. We’ll try to steer around most of that complexity and boil it down to the following:
ValueTask<T> and ValueTask were introduced in .NET Core 2.0 and .NET Core 2.1 respectively. The principle behind these types is to reduce allocations on asynchronous code paths by avoiding Task allocations for code that can complete synchronously. Channels is a good fit for this case, because it’s possible for a consumer to read from the Channel while it already has items in it’s internal queue. When this is the case, the method can return synchronously and avoid allocating a Task to wrap the result. Only in cases where there are no items available does the consumer truly need to await a Task asynchronously.
In .NET Core 2.1 the IValueTaskSource<T> interface was added to support further advanced optimisations. By implementing the interface, a developer may provide an awaitable type that can be wrapped with a ValueTask<T>. The main advantage here is control since the developer may now pool/cache instances of that awaitable implementation such that we can reuse the same instance time and time again. This further avoids Task allocations in cases where the code needs to execute asynchronously. In a case where the consumer(s) process data more quickly that it is produced, we end up on an async path. Rather than allocate a new Task for each read, where possible, a pooled instance of AsyncOperation<T> may be re-used and awaited by the calling code.
As some further (more authoritative) reading into ValueTask<T> and IValueTaskSource<T> I recommend starting with these great blog posts:
- Understanding the Whys, Whats, and Whens of ValueTask
- Task, Async Await, ValueTask, IValueTaskSource and how to keep your sanity in modern .NET world
- Implementing custom IValueTaskSource – async without allocations
- Prefer ValueTask to Task, always; and don’t await twice
Right, where were we? Oh yeah, we’re inside TryWrite where the code is checking for blocked readers.
We enter the if block when there are no blocked readers. In this situation, the code must Enqueue the item into the ConcurrentQueue (_items). The next lines of code then store a reference to an AsyncOperation<bool> after grabbing it from the parent. The _waitingReadersTail field on the UnboundedChannel<T> is used to store a linked list of operations (AsyncOperation<T>). Operations are added to the chain whenever a consumer calls WaitToReadAsync on the reader. If this reference is null, then there are no waiting readers to notify so the method can now return true, indicating that the item was added successfully. In cases where there is an AsyncOperation<bool>, the reference is maintained in the local variable and the parent reference is set to null. Since we are inside a lock, this occurs in a thread-safe manner.
In part one, I intentionally said that items “may be stored” in the ConcurrentQueue<T>. The else block here helps us to understand this statement a little better. In cases where there is at least one consumer asynchronously awaiting ReadAsync, rather than queue the item, we will hand it off to the consumer directly. This avoids potentially growing the array backing the ConcurrentQueue so is efficient from both an execution time and possibly allocation point of view. Inside the else block, a blocked reader is dequeued from the head of the Deque<AsyncOperation<T>> (_blockedReaders) double-ended queue.
At this point, if the item has not been enqueued or has been added to the queue and there are waiting readers, the code flow now exits the lock. The Channels implementations try to lock for as short a period as possible.
The final conditional first checks if the local blockedReader variable contains a reference to a waiting reader. Remember, this is a consumer that has called ReadAsync. If the variable is not null, the code can now attempt to hand off the item by calling TrySetResult on the AsyncOperation<T>. It’s possible this could fail if the reader has been cancelled, so it’s not a guaranteed situation. If the reader is still active, the result will be set, and the continuation from the calling code can continue to execute to process the read item. If the reader can no longer accept an item due to having been cancelled, the code flow exits the if block and we have completed the first loop iteration (recall we’re inside a while(true) loop). The next iteration can try again to enqueue the item or locate another blocked reader to send the item to.
The final block is the else case, where there is no blocked reader. When execution reaches here, the item has been enqueued, and there may be one or more WaitToRead operations pending. The static ChannelUtilities class is used to wake up all waiters. Comments in the original code acknowledge that since we’re outside the lock, it’s possible that waiters will be awoken and the queued item may have already been proceeded by another consumer. This is not considered a problem as consumers are expected to account for that possibility in the consuming code.
We’ve now covered TryWrite in some detail, and our exploration of the implementation has helped us appreciate the mechanics of how items are written efficiently. This is the most complex writing method for the UnboundedChannel<T>.
We’ll briefly look at the two other methods on the ChannelWriter implementation.
For unbounded queues, the logic for this method is quite straightforward since in most cases, we can always write because the capacity is unlimited. A ValueTask<bool> will be returned indicating if an item can be written.
If cancellation is requested by the caller via their CancellationToken, then a cancelled Task is returned.
If the _doneWriting field on the parent channel is not set with any Exception instance, then writing can always take place, since the Channel capacity is unbounded. You’ll recall that this field may hold a reference to an Exception when the Channel has been completed, or an exception was thrown somewhere. When this field is null, the Channel is still active and available for writing.
If _doneWriting is not null and the Exception is not equal to the ChannelUtilities.s_doneWritingSentinel, then there was an underlying exception, so a Task is created from that Exception.
If _doneWriting is equal to ChannelUtilities.s_doneWritingSentinel then the default ValueTask<bool> is returned where the result is false. This identifies that writing cannot proceed as the ChannelWriter has been completed. The static field s_doneWritingSentinel on ChannelUtilities provides a special Exception instance used as a sentinel object to indicate completion.
For unbounded queues, the logic for this method is also relatively straightforward.
If cancellation is requested by the caller, a cancelled Task is returned. Otherwise, TryWrite is called, and if that succeeds, a default ValueTask, containing the cached completed Task is returned. Otherwise, an exception is created via ChannelUtilities.CreateInvalidCompletionException, passing in the _doneWriting exception from the parent channel.
There are three possible return values for this helper method. If the provider inner Exception is an OperationCanceledException, then that is returned directly so that the caller can cooperate with the cancellation of the operation. If the Exception is not null and does not match the special s_doneWritingSentinel, then a new ChannelClosedException is created, wrapping the inner Exception. The final possibility is that the inner Exception is equal to the s_doneWritingSentinel, so a ChannelClosedException is created, with no inner exception required.
We’re pretty much done with our deep dive into the internals of UnboundedChannelWriter. Before I wrap up, it’s worth digging into how Channel writing is marked completed so that the Channel can signal to consumers when they should also complete. The ChannelWriter<T> abstract class supports two completion methods, Complete and TryComplete.
UnboundChannel<T> overrides the virtual TryComplete method from the base class. The complete code for this method can be found on GitHub. I’ll show the relevant pieces of code as we discuss them.
This is another operation which requires thread-safe synchronisation, so a lock over the SyncObj (_items field) is obtained.
The condition checks if the parent Channel is already marked as done, which is the case if _doneWriting is non-null. If the Channel is already marked as complete for writing, the TryComplete method returns false, since we can’t complete the Channel writing twice.
The parent _doneWriting field is then set, either with the error Exception (if the argument is not null) or using the s_doneWritingSentinel from ChannelUtilities. Remember that although s_doneWritingSentinel is an Exception, it is used for cases where Channel writing is marked as completed without failure. This makes it not null when any other checks against done writing take place. We saw some examples of such checks when we looked at TryWrite.
The final code which executes inside the lock assigned the local variable completeTask. This will be set to true if the items collection is currently empty. Since we’re in a lock, if this is empty at this point, it will never contain items, since writing code must obtain the lock and check _doneWriting before it can add further items.
The code execution can now exit the lock since remaining operations are now in a thread-safe situation and could possibly also be in a position where synchronous completions may need to run.
The next condition checks to see if completeTask is true. If this is the case, then the Channel is completed using the ChannelUtilities.Complete helper method, otherwise it’s up to any readers to complete the channel once all items have been consumed. Note that in this code, completion of the Channel occurs before waking up any waiting readers so that when they execute, they will see that the Channel is now marked as completed and can themselves be completed.
Let’s take a quick look at the Complete method on ChannelUtilities.
This code completes the TaskCompletionSource of the parent Channel with the appropriate completion state.
If the error Exception parameter is not null and is an OperationCanceledException then the TaskCompletionSource is also set as cancelled. If the error Exception is not null and is not equal to the done writing sentinel, then the TaskCompletionSource is completed with the Exception. The final possibility is that writing has been marked as completed so a default result is set on the TaskCompletionSource.
The Task from the TaskCompletionSource is exposed on the ChannelReader<T> so that consumers may await the Task to propagate any exceptions.
Back in UnboundChannelWriter.TryComplete…
The final code executes to handle any blocked/waiting read operations that are not already underway. By this point, the _blockedReaders and _waitingReaders fields will no longer be mutated by other threads since that can only occur by Readers that hold a lock and while _doneWriting is null. This is no longer the case. The code can now manipulate these fields without any concurrency concerns.
Any _blockedReaders AsyncOperations<T> (consumers which have called and awaited ReadAsync) will now be marked as failed with a ChannelClosedException. Any waiting readers (WaitToReadAsync) are woken up and completed either with an exception (if one is provided) or the result value of false. Consumers will continue and should break their read loop to begin completing their work.
TryComplete now returns true since the completion for the ChannelWriter has succeeded.
The Complete method located on the ChannelWriter<T> base class is straightforward. It calls down to the TryComplete method, overridden in the case of UnboundedChannel. If tryComplete returns false, indicating that the attempt to complete failed, then a CreateInvalidCompletionException is created using the helper method on ChannelUtilities. We looked at that code earlier, and we know that since there is no Exception argument provided, the result is a new ChannelClosedException being thrown.
This has been quite a long, deep dive into the internal implementation details for the UnboundedChannelWriter, used on an UnboundedChannel<T> to support writing operations. We explored the three methods available for writing to a Channel, stepping through their implementation. We saw that most of the writing logic for an UnboundedChannel is contained within TryWrite. A key takeaway is that due to the unbounded nature of this implementation, the async write operations (WriteAsync and WaitToWriteAsync) will always complete synchronously. This is made efficient through the use of ValueTask and ValueTask<T>.
We then looked at how a Channel is marked as completed, indicating that no more items will ever be written. This allows consumers to complete once they drain any queued items.
Much of this detail is not necessary to use the Channel library but in complex situations, it may be helpful to understand what is actually happening inside the Channel. I found it really interesting to dig into the code and see some real-world examples of how IValueTaskSource<T> can be implemented in high-performance areas of code.
Join me in part 3, where we’ll complete our journey by reviewing how items are read from an UnboundedChannel<T> via its UnboundedChannelReader implementation.