Асинхронные потоки от Стивена Клири

Про Стивена Клири можно сказать «он всерьез занялся многопоточным программированием еще до того, как это стало мейнстримом». Клири стал палочкой-выручалочкой для тысяч разработчиков, терпеливо объясняя на StackOverflow, почему программы не работают и как их исправить.

Еще в 2019-м на нашей конференции DotNext Стивен рассказал об асинхронных потоках: чем они могут быть полезны, чем отличаются от существующих библиотек вроде System.Reactive и RxJS, а также как задействовать их в проектах.

Спустя пару лет этот доклад остается полезным — поэтому теперь, в преддверии нового DotNext, мы решили сделать для Хабра текстовый перевод доклада Стивена. Видео тоже есть под катом. Далее повествование будет от лица спикера.

Ссылка на репозиторий

Слайды

Асинхронное вторжение

Microsoft первым использовал async/await, но они вышли за пределы языков Microsoft. Сперва С# 5.0, Typescript 1.6. Через некоторое время присоединились Python 3.5 и JavaScript ES2017. Async/await — уже несколько лет официальная часть языка JavaScript. И мне кажется очень интересным, что async/await, то есть что-то, изобретенное в Microsoft, также было принято в языках, которые достаточно сильно отличаются друг от друга.

Сперва в Microsoft сказали, что в C# и TypeScript асинхронность поддерживается на уровне языка, после чего фича появилась в Python, где решения принимаются сообществом, а не конкретной корпорацией. Наконец, в мире JavaScript различные компании, разработчики браузеров и языков заявили: «Вот что мы хотим во всемирном стандарте для нашего языка». Это три очень разных стиля разработки языков, но несмотря на это все они внедрили async/await очень похожими способами, что мне кажется весьма показательным.

Асинхронные потоки/генераторы

Следующий шаг в развитии async/await — это асинхронные потоки (async streams) или асинхронные генераторы (async generators). Они в первую очередь появились в Python 3.6, словно разработчики подумали: «async/await — отличная синтаксическая конструкция, так что добавим-ка к ним еще и асинхронные потоки». Их также внедрили в Typescript 2.3 и Javascript ES2018, а в 2019 году они стали частью C# 8.0.

Асинхронные потоки встроены прямо в .NET Core 3 и следующие версии .NET. В .NET Framework 4 для работы с асинхронными потоками потребуется установить NuGet-пакет.

На DotNext мы будем говорить только о C# и время от времени упоминать другие языки.

Зачем нам асинхронные потоки?

Давайте сперва разберемся, зачем нам вообще могут понадобиться асинхронные потоки и асинхронный ввод/вывод. Вот главные преимущества, которые можно получить при работе с async/await, да и с любым асинхронным кодом.

Для клиента

Время отклика: мы не блокируем UI-поток, и GUI или мобильное приложение становится более отзывчивым по отношению к пользователю.

Для сервера

Масштабируемость: вы освобождаете потоки, сервер может обрабатывать больше запросов, а также намного эффективнее справляться с резким увеличением трафика.

Кроме того, асинхронный код позволяет избегать низкоуровневых абстракций вроде Thread.

Подходы к использованию асинхронных потоков

Давайте посмотрим конкретно на асинхронные потоки. Я выделяю три основных подхода к асинхронных потокам:

  • enumerables
  • tasks
  • observables.

Enumerables

Сравним асинхронные потоки и энумераторы. Enumerable — это что-то вроде iterator block или generator block. Enumerables в .NET всегда синхронные. Любой LINQ — запрос в версии до .NET Core всегда будет синхронным, будь то LINQ2Objects или LINQ2SQL.

Примечание переводчика. ToListAsync существовал в Entity Framework и раньше, но это extension-метод, объявленный именно в коде фреймворка, а не в стандартной библиотеке.

Однако мы могли бы сделать «асинхронизацию» во время перечисления. Это и будут асинхронные потоки, но об этом позже.

Tasks

Чаще всего async/await используется именно с тасками. Таск может вернуть одно значение или список, но есть проблема — таски завершаются единожды. Задача может завершиться успешно, ошибкой или отменой, но лишь единожды.

Нам нужна возможность сказать: «Вот значение» и через некоторое время добавить: «А вот еще одно значение». Хочется генерировать несколько результатов асинхронно, а не одновременно. Это одна из весомых причин использования асинхронных потоков.

Observables

Люди редко используют reactive-расширения, и не зря. Мне кажется, это не самое понятное решение для программиста.

Observables представляют собой нечто асинхронное с несколькими значениями, как мы и хотели. Они позволяют определить асинхронный поток, который даст нам одно значение сейчас и еще одно чуть позже.

Reactive extensions отличаются высоким порогом входа. Большинство манипуляций с потоками здесь осуществляется с помощью операторов. Мы ищем что-то, с чем можно использовать стандартный синтаксис языка, например, foreach, который нельзя использовать с observable.

Сравнение трех подходов

Синхронный/асинхронный Одно/несколько значений push/pull-based
IEnumerable<T> синхронный несколько pull
Task<T> асинхронный одно pull
IObservable<T> асинхронный несколько push
IAsyncEnumerable<T> асинхронный несколько pull

У enumerables и tasks низкий порог входа, и они тоже часть языка: enumerables можно перебирать с помощью foreach, таски можно ждать с помощью await. В мире .NET редко используются observables, хоть они сначала и появились именно там.

Сообщество JavaScript использует observables намного активнее. Скорее всего, причина в том, что в JavaScript всего один поток, а значит, их проще там изучить и использовать. В С# мы хотим нечто похожее на observables, что будет лучше интегрировано в язык. Что-то более простое в понимании и использовании.

Observable на самом деле push-based: когда у observable есть поток, вы подписываетесь на этот поток, после чего этот поток отправляет данные. Таски и перечисления, наоборот, pull-based — когда мы используем foreach на любом энумераторе или LINQ-запросе, мы достаем результаты по одному. В язык более естественно встраиваются именно pull-based-решения.

AsyncronousEnumerable лучше всего подходит, когда нужно вернуть несколько значений асинхронно. Такой подход отличается от observables, которые, напоминаю, push-based.

Асинхронные потоки дополняют все перечисленные подходы, а не заменяют их.

Асинхронные потоки

Давайте взглянем на асинхронные потоки. Если вкратце, то это возможность в одном методе использовать одновременно await и yield return.

Yield return — достаточно специфичная конструкция, хотя и используется широким кругом людей. Посмотрим, как работает yield return.

Демо

static void YieldReturnMain()
{
    foreach (int item in YieldReturn())
        Console.WriteLine($"Got {item}");
}

static IEnumerable<int> YieldReturn()
{
    // Deferred execution!
    yield return 1;
    yield return 2;
    yield return 3;
}

Вызовем этот метод, он будет возвращать enumerable, в котором три значения, после чего мы с помощью foreach достанем каждое из значений.

Когда мы достигаем точки остановки на «yield return 1;», возвращается первое значение, и оно выводится в командную строку. То же самое со вторым значением. Интересно, что yield return останавливает программу, заставляя компилятор создать конечный автомат.

Этот метод разделяется на секции, и в каждом месте, где находится yield return, возвращается новое значение в этом enumerable. Каждый раз, когда выполнение передается в конечный автомат секции метода, доступно только локальное состояние, а не весь call stack. Это основная идея enumerables.

Рассмотрим еще пример кода, иллюстрирующий работу enumerable:

static void YieldReturnMain()
{
     foreach (int item in YieldReturn())
         Console.WriteLine($"Got {item}");

     // same as:
     using (IEnumerator<int> enumerator = YieldReturn().GetEnumerator())
     {
            while (enumerator.MoveNext())
            {
                int item = enumerator.Current;
                Console.WriteLine($"Got {item}");
            }
     }
}

static IEnumerable<int> YieldReturn()
{
    // Deferred execution!
    yield return 1;
    yield return 2;
    yield return 3;
}

Foreach идентичен методу, который написан после него. Интересно, что метод MoveNext обращается к конечному автомату, поэтому когда мы первый раз вызываем GetEnumerator, он сперва создает автомат, после чего каждый раз при вызове метода MoveNext идет обращение к конечному автомату. Это очень похоже на работу async/await. Конструкция async/await тоже создает конечный автомат, сохраняет локальное состояние и разбивает метод на секции, соответствующие каждому вызову метода await. Поэтому при отладке асинхронного кода можно заметить, что большинство исключений выбрасывается из метода MoveNext.

Я никогда не уточнял у Microsoft, но мне всегда казалось, что тот, кто написал async/await, посмотрел на задачу и подумал: «Мы же практически то же самое делаем с enumerable, я просто возьму и скопирую». В итоге этот странный вызов MoveNext теперь присутствует во всех трассировках async/await-код. Название метода выглядит нелогичным в отрыве от контекста. Но если предположить, что этот код заимствован из реализации enumerable, то все встает на свои места: MoveNext — это переход к следующему элементу в перечислении.

Async Enumerable

Давайте перейдем к асинхронным энумераторам.

Демо

static async Task AwaitAndYieldReturnMain()
{
    await foreach (int item in AwaitAndYieldReturn())
         Console.WriteLine($"Got {item}");
}

static async IAsyncEnumerable<int> AwaitAndYieldReturn()
{
    await Task.Delay(TimeSpan.FromSeconds(1)); // pause (await)
    yield return 1; // pause (produce value)
    await Task.Delay(TimeSpan.FromSeconds(1)); // pause (await)
    yield return 2; // pause (produce value)
    await Task.Delay(TimeSpan.FromSeconds(1)); // pause (await)
    yield return 3; // pause (produce value)
}

Здесь все то же самое, за исключением того, что метод enumerable стал асинхронным. Это очень простой код, предназначенный только для демонстрации: перед каждым yeild мы ждем одну секунду и возвращаем значение. Обратите внимание, что здесь используется await foreach — новая синтаксическая конструкция C# 8.0, позволяющая использовать асинхронные перечисления.

static async Task AwaitAndYieldReturnMain()
{
    await foreach (int item in AwaitAndYieldReturn())
        Console.WriteLine($"Got {item}");

    // same as:
    await using (IAsyncEnumerator<int> enumerator = AwaitAndYieldReturn().GetAsyncEnumerator())
    {
        while (await enumerator.MoveNextAsync())
        {
            int item = enumerator.Current;
            Console.WriteLine($"Got {item}");
        }
    }
}

await foreach разворачивается почти так же, как и синхронный foreach в случае с синхронными перечислениями, за исключением того, что в асинхронной версии вызывается метод GetAsyncEnumerator, а не GetEnumerator и MoveNextAsync вместо MoveNext.

Подробнее об асинхронных потоках

Таким образом, метод может остановиться в момент вызова yield и в момент вызова await. Иными словами асинхронные потоки объединяют ленивую природу enumerables и асинхронную сущность тасков. Разница только в том, что GetNextItem становится асинхронным, и называется он по-разному в зависимости от языка.

У Python и Java есть свои названия для GetNextItem, но под капотом во всех этих языках это работает очень похожим образом. В конечном итоге, MoveNextAsync() — это просто деталь реализации.

Раз уж речь зашла о других языках, нельзя не упомянуть Kotlin. Этот доклад хорошо дополняет «Асинхронно, но понятно. Сопрограммы в Kotlin». Андрей Бреслав рассказывает о реализации корутин и часто сравнивает примеры реализации в Kotlin и C#.

Где асинхронность?

Если мы взглянем на определения типов — в данном случае С# — в .NET у нас есть enumerator, который наследует IDisposable, на случай, если нужно освободить ресурсы. Далее — MoveNext, который перемещается на следующее значение, если возможно, и возвращает false, если больше элементов нет и current, чтобы получить текущий элемент.

Асинхронный тип очень похож. Также присутствуют current и MoveNextAsync, возвращающий ValueTask<bool>, к которому мы еще вернемся. Асинхронный enumerator реализует IAsyncDisposable — новый интерфейс в C# 8.0. Если вкратце, IAsyncDisposableнужны для асинхронных потоков, потому что у вас может быть try/catch или try/finally в блоке энумератора, внутри которого присутствует await.

Немного о ValueTask

О ValueTask можно думать как о более эффективном Task, особенно если значения таска синхронные. Но есть несколько ограничений.

Во-первых, обработать его можно один раз, с помощью await или использовать .AsTask, чтобы конвертировать его в Task, но сделать это можно лишь единожды. Дело в том, что ValueTask на самом деле — Value Type, который может быть перезаписан после чтения. Из-за этого люди часто путаются. Некоторые свойства тоже отличаются от своих аналогов в стандартном Task, например, Result. Это создает дополнительные проблемы, потому что ValueTask существует со времен .NET Core 2.2. И у него были определенные сценарии использования. Теперь появилась еще одна возможность. В данной ситуации .Result не блокирующий, если только по счастливой случайности он не оборачивает другой таск, после чего оно также по счастливой случайности завершит работу без ошибок, хотя код неверен.

В ValueTask нельзя вызвать .Result, пока он не закончит свою работу. Откровенно говоря, это деталь реализации, на которую не стоит полагаться. Если вы используете код, сгенерированный компилятором, для foreach await будет всегда действовать корректно и делать одно и то же в асинхронных потоках, он никогда не нарушит эти правила, но когда вы делаете это вручную, об этом стоит помнить.

Последнее, что я сегодня скажу о ValueTask: Марк Гравел (Mark Gravell) написал замечательный пост о том, что все должны использовать ValueTask, и я соглашусь с этим со временем. Сейчас я не уверен, потому что не все разработчики знают о подводных камнях, которые я описал ранее. Я уже слышу вопросы: «Я переписал свой код на ValueTask, вызвал Result, и код не работает». Да, не работает, потому что так нельзя делать в мире ValueTask, код скомпилируется, но не будет вести себя так, как вы думаете.

Пока разработчики не освоятся с ValueTask и не научатся обходить его подводные камни, я не могу советовать использовать его повсеместно. Команда Марка Гравела состоит из превосходных профессионалов своего дела, и они стараются улучшить производительность везде, где возможно. Для его команды использовать ValueTask целесообразно. Но применимо ли это ко всей экосистеме .NET? Думаю, пока нет.

ConfigureAwait

В демо мы с вами видели, что foreach и await foreach очень похожи друг на друга. Чего мы не видели, это как работает ConfigureAwait вместе с await foreach. Можно вызвать ConfigureAwait(false) в блоке await foreach, В таком случае он сгенерируется в await, используя ConfigureAwait(false), и тогда в async disposable тоже появляется ConfigureAwait(false), и MoveNextAsync тоже получает ConfigureAwait(false). Поэтому, если вы использовали его единожды в await foreach, то он появится во всём методе.

Где и как выставлять ConfigureAwait — вопрос неоднозначный. Пожалуй, его можно сравнить с вопросом «где и когда ставить volatile». Чтобы составить свое мнение, прочитайте Async/await в C#: подводные камни и посмотрите Async programming in .NET: Best practices.

Создание и использование асинхронных потоков

Асинхронные методы поддерживаются в различных языках, и каждый язык по-своему создает и поддерживает их, но фундаментальные принципы остаются теми же.

Обратите внимание, что в C# используется await foreach, а в JavaScript — foreach await. Вероятно, это станет источником бесчисленного количества косяков. Я сам уже пару раз так накосячил.

С другой стороны, есть и хорошие новости: исключения везде работают именно так, как вы ожидаете. Исключения тоже ленивые, как любой enumerable или generator, и их можно ловить с помощью try/catch. В catch-блоке вы увидите настоящее исключение, безо всяких оберток или других неожиданностей.

Сценарии использования

Я считаю, что самый распространенный случай использования асинхронных потоков — Paging API — API, возвращающий только определенное количество результатов. Здесь для этого я использую limit = 10, и offset = 0, обозначающий, с которого элемента нужно начать.

[ApiController]
[Route("api/[controller]")]
public class ValuesController : ControllerBase
{
    [HttpGet]
    public async Task<IReadOnlyCollection<int>> Get(int limit = 10, int offset = 0)
    {
        await Task.Delay(TimeSpan.FromSeconds(3));
        return Enumerable.Range(0, 13).Skip(offset).Take(limit).ToList();
    }
}

Взглянем, как это выглядит в коде, который вызывает этот метод.
Для начала с помощью await foreach вызовем наш асинхронный поток на сервере.

static async Task PagingApiMain()
{
    Console.WriteLine($"{DateTime.Now:hh:mm:ss} Starting...");
    await foreach (int item in PagingApi())
        Console.WriteLine($"{DateTime.Now:hh:mm:ss} Got {item}");
}

Я установлю pageSize = 5, хоть API и имеет лимит 10 по умолчанию. Мы получаем каждую страницу результатов, достаем их из JSON и возвращаем все результаты.

static async IAsyncEnumerable<int> PagingApi()
{
    // Handle the paging only in this function.
    // Other functions don't get polluted with paging logic.
    const int pageSize = 5;
    int offset = 0;

    while (true)
    {
        // Get next page of results.
        string jsonString = await HttpClient.GetStringAsync(
            $"http://localhost:53198/api/values?offset={offset}&limit={pageSize}");

        // Produce them for our consumer.
        int[] results = JsonConvert.DeserializeObject<int[]>(jsonString);
        foreach (int result in results)
            yield return result;

        // If this is the last page, then stop.
        if (results.Length != pageSize)
            break;

        // Index to the next page.
            offset += pageSize;
    }
}

Это типовой пример использования асинхронных потоков: суммарно мы загружаем двенадцать элементов, но запросов к серверу только три. Каждый запрос возвращает не более пяти элементов.

ASP.NET Core поддерживает возврат IAsyncEnumerable<T> из методов контроллеров начиная с пятой версии. В шестой устраняют буферизацию, что положительно скажется на потреблении памяти.

Пример неверного использования API-нотификаций

Асинхронные потоки не заменяют observable. Существуют сценарии, когда Observables подходят лучше, чем Async Streams.

Например, SignalRи веб-сокеты. Мы подключаемся и получаем сообщения. Не существует способа запросить данные. Сообщения приходят тогда, когда приходят. Отправку сообщений контролирует отправляющая сторона, а не принимающая. Другой пример: протоколы обмена биржевыми котировками. В них используются нечто вроде HTTP API, но не совсем.

Устанавливается обычное HTTP-соединение, после чего отправляется несколько ответов. Таким образом экономится время на постоянные подключения, отключения и отправку новых запросов. Словом, асинхронные потоки не лучший выбор для обмена сообщения, следующего логике подписка > несколько обновлений/запросов > отписка.

Для таких случаев больше подходит observable, потому что они push-based по умолчанию. Вы можете сделать их pull-based, если хотите. Даже преобразовать Observable к Async Stream, потому что Async Streams проще использовать, чем Observables. Для этого необходимо создать некий буфер, чтобы в него приходили сообщения, когда их отправляет поставщик, а асинхронный поток забирал оттуда элементы, когда ваш код вытягивает их из буфера. Буфер, который я всегда рекомендую, — System.Threading.Channels — это асинхронные очереди вида producer/consumer. Фактически вы подключаете push-based API напрямую к буферу. Остальной ваш код может асинхронно получать данные из буфера, вызывая ReadAllAsync. Каналы уже совместимы с асинхронными потоками, так что ReadAllAsync вернет IAsyncEnumerable<T>.

Асинхронный LINQ

Я бы хотел побольше поговорить о прикладных сценариях использования асинхронных потоков. Мы можем осуществлять преобразования над потоком, прежде чем вызвать foreach await. У LINQ есть множество применений: LINQ2Objects, LINQ2SQL, LINQ2Events (Observables), и теперь у нас есть еще LINQ2AsyncronousStreams. System.Linq.Async — это проект сообщества, а не Microsoft. Я не знаю, как они заполучили это пространство имен, тем не менее.

System.Linq.Async дает нам все, чем мы привыкли пользоваться в мире LINQ, но для асинхронных потоков.

static async IAsyncEnumerable<int> SlowRange()
{
        for (int i = 0; i != 10; ++i)
        {
            await Task.Delay(i * TimeSpan.FromSeconds(0.1));
            yield return i;
        }
}

Начнем со SlowRange. Он просто считает до 10, и каждый раз он делает задержку чуть длиннее. Это асинхронный поток, который походу времени замедляется.

Далее, я вызываю LINQ-метод .Where.

static async Task BasicLinqMain()
{
    Console.WriteLine($"{DateTime.Now:hh:mm:ss} Starting...");
    IAsyncEnumerable<int> query = SlowRange().Where(x => x % 2 == 0);
    await foreach (int item in query)
        Console.WriteLine($"{DateTime.Now:hh:mm:ss} Got {item}");
}

Здесь он находит все четные числа. Если мы запустим программу, то мы увидим, что она становится все медленнее и медленнее, пока не дойдет до конца ряда.

Передача асинхронных лямбд в LINQ

Но это еще не все. Link-to-Streams позволяет нам передавать асинхронные лямбды, а значит, мы можем передать асинхронный метод в where и count.

Мне нравится думать о работе WhereAwait так: WhereAwait будет ждать лямбду, которую вы передаете.

Кстати о count. Также есть несколько LINQ-операторов, которые возвращают async enumerable, а другие LINQ-операторы возвращают единственное значение. Терминальные операторы возвращают async, и их можно ожидать с помощью await, например, можно вызвать метод CountAsync, и он асинхронно посчитает количество элементов в потоке.

И если вы хотите усложнить еще больше, то можно использовать CountAwaitAsync, который может принять как аргумент асинхронный метод и вернуть асинхронное значение.

Очень часто задаваемый вопрос о LINQ и асинхронных запросах — происходит ли конкурентный доступ к данным? Пока мы получаем асинхронные значения, если я делаю WhereAwait или SelectAwait и говорю: «Иди посмотри это в API, дай мне дополнительное значение и верни уже его» каждый раз, когда получаю значение, то возможна ли гонка?

Ответ — нет, все элементы серийные, обработка происходит по одному элементу. Нет ни конкурентности, ни параллелизма.

Посмотрим, как это выглядят.

static async Task LinqWithAsyncLambdasMain()
{
    Console.WriteLine($"{DateTime.Now:hh:mm:ss} Starting...");
    IAsyncEnumerable<int> query = SlowRange().WhereAwait(async x =>
    {
        await Task.Delay(TimeSpan.FromSeconds(0.1));
        return x % 2 == 0;
    });

    await foreach (int item in query)
        Console.WriteLine($"{DateTime.Now:hh:mm:ss} Got {item}");
}

Здесь я передаю асинхронную лямбду в where, и это очень частый вопрос, который годами возникал у людей к LINQ: «Как я могу сделать асинхронный filter или асинхронный select?» И до сих пор ответом всегда было «никак», потому что раньше не существовало асинхронных потоков. Теперь это возможно. И если у вас есть асинхронный поток, например SlowRange, теперь мы можем вызвать .WhereAwait и передать ему асинхронный метод, и этот метод добавляет 0,1 секунды каждый раз при получении значения. Таким образом мы можем передавать асинхронные методы/лямбды в операторы, которые заканчиваются на await. Никому не нравится конвенция именования с постфиксами Await, но во избежании конфликтов с перегрузкой других методов остановились на таком варианте: WhereAsync.

И у нас есть терминальные методы, если мы хотим посчитать все эти элементы асинхронно, например CountAsync.

static async Task TerminalLingMethodMain()
{
    Console.WriteLine($"DateTime.Now:hh:mm:ss} Starting...");
    int result = await SlowRange().CountAsync(x => x % 2 == 0);
    Console.WriteLine($"{DateTime.Now:hh:mm:ss} Result: {result}");
}

Мы не передаем асинхронную лямбду, мы просто считаем четные результаты, но методу CountAsync нужно быть асинхронным, так как наш источник асинхронный.

Наконец, мы можем передать асинхронный метод оператору count, и в таком случае наша перегрузка будет невероятно неловкой: CountAwaitAsync. У этой перегрузки есть оба суффикса, потому что она ждет значение и возвращает значение вместо enumerable.

static async Task TerminalLinqMethodMain()
{
    Console.WriteLine($"{DateTime.Now:hh:mm:ss} Starting...");
    int result = await SlowRange().CountAsync(x => x % 2 == 0);
    Console.WriteLine($"{DateTime.Now:hh:mm:ss} Result: {result}");
}

static async Task TerminalLinqMethodWithAsyncLambdasMain()
{
    Console.WriteLine($"{DateTime.Now:hh:mm:ss} Starting...");
    int result = await SlowRange().CountAwaitAsync(async x =>
    {
        await Task.Delay(TimeSpan.FromSeconds(0.1));
        return x % 2 == 0;
    });

    Console.WriteLine($"{DateTime.Now:hh:mm:ss} Result: {result}");
}

Преобразование обычного LINQ

Еще можно взять обычный enumerable, то есть какой-нибудь существующий LINQ-метод, и использовать с ним асинхронную лямбду. Например, у нас есть несколько значений, может быть, даже список, то есть, что-то уже синхронное. И мы хотим посмотреть на каждое значение по одному, или отфильтровать их с помощью Where. Мы можем вызвать ToAsyncEnumerable, превращающий обычный enumerable в async enumerable. Он все еще синхронный, мы не меняем его природу, но теперь у него есть асинхронный API, который мы можем использовать со всеми расширенными LINQ-операторами, которые у нас есть для асинхронных потоков.

Скажем, у нашего кода есть вот такой enumerable, и он синхронный, это данные в памяти. Мы хотим применить асинхронный фильтр к синхронному enumerable. В данном случае я просто делаю асинхронную задержку, и отбираю только нечетные.

static async Task SuperchargeLinqMain()
{
    // I have this enumerable and want to pass an async lambda to Where.
    IEnumerable<int> query = Enumerable.Range(0, 10);

    IAsyncEnumerable<int> asyncQuery = query.ToAsyncEnumerable()
        .WhereAwait(async x =>
        {
            await Task.Delay(TimeSpan.FromSeconds(0.5));
            return x % 2 == 0;
        });

    Console.WriteLine($"{DateTime.Now:hh:mm:ss} Starting...");
    await foreach (int item in asyncQuery)
        Console.WriteLine($"{DateTime.Now:hh:mm:ss} Got {item}");
}

Мы вызываем .ToAsyncEnumerable, чтобы он «поднял» (lift) этот enumerable в асинхронный поток, после чего идет дополнительный оператор, который мы можем использовать, — перегрузка WhereAwait, куда мы передаем асинхронную лямбду.

Начиная с раздела о LINQ для Async Streams Стивен пару раз произносит слово на букву М. В процессе расшифровки мы удалили эти упоминания, но здесь снова появился в оригинале lift… В общем, если хотите разобраться во всем этом, то читайте серию статей The «Map and Bind and Apply, Oh my!» series Скотта Влашина.

Отмена асинхронных потоков

Давайте поговорим об отмене запросов. Это частный случай для C#. Всё, что мы обсудили до этого, существовало и в других языках в какой-то степени. Все, что последует далее, применимо только к C#.

Для поддержки отмены операции (cancellation) в асинхронном enumerable, мы используем CancellationToken, как в любом другом API в .NET. Отличие состоит в том, что теперь нужно добавить к нему новый атрибут EnumeratorCancellation. Я чуть позже я объясню почему. Microsoft Visual Studio напомнит вам о том, что этот атрибут необходим, если вы забыли, так что можете сильно не переживать. Непонятно, почему Microsoft реализовала автоматические предупреждения, но не добавила этот атрибут автоматически.

Чтобы отменить асинхронный поток, мы можем либо передать CancellationToken в метод напрямую, что более естественно, либо сделать это более замороченным способом — с помощью нового расширенного метода WithCancellation. Основная причина для этого заключается в том, что на самом деле отменить можно enumerator, а не enumerable. Посмотрим на это наглядно в коде.

Взглянем сперва на простую отмену. У нас всё еще остался старый SlowRange, который теперь можно отменить. CancellationToken работает так же, как он работал всегда, добавляется только атрибут. Далее я использую токен, который активируется через три секунды, и пройдусь по значениям асинхронно.

static async IAsyncEnumerable<int> CancelableSlowRange(
    [EnumeratorCancellation] CancellationToken token = default)
{
    for (int i = 0; i != 10; ++i)
    {
        await Task.Delay(i * TimeSpan.FromSeconds(0.1), token);
        yield return i;
    }
}

#region Cancellation
static async Task SimpleCancellationMain()
{
    using CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
    Console.WriteLine($"{DateTime.Now:hh:mm:ss} Starting...");
    await foreach (int item in CancelableSlowRange(cts.Token))
        Console.WriteLine($"{DateTime.Now:hh:mm:ss} Got {item}");
}

Мы успели дойти до седьмого элемента, а потом сработала отмена. TaskCancelException работает так же, как и любое другое исключение в асинхронных потоках, ровно то, что мы могли бы ожидать от отмены. Когда я вызываю CancellableSlowrange, я передаю токен напрямую в Task.Delay.

Зачем же нам нужен атрибут? Это важно в более сложных и продвинутых случаях отмены. Скажем, у меня есть собственный оператор, который берет всё из этой последовательности и выводит на консоль, но делает это только три секунды.

static async Task ConsumeSequenceWithTimeout(IAsyncEnumerable<int> items)
{
    using CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
    await foreach (int item in items.WithCancellation(cts.Token))
        Console.WriteLine($"{DateTime.Now:hh:mm:ss} Got {item}");
}

static async Task ComplexCancellationMain()
{
    Console.WriteLine($"{DateTime.Now:hh:mm:ss} Starting...");
    await ConsumeSequenceWithTimeout(CancelableSlowRange());
}

Сюда можно передать любую последовательность, любой асинхронный поток. Это оператор, которому все равно, какой генератор или какой enumerable вы ему передаете, он хочет просто взять всё оттуда всего на три секунды, поэтому он использует extension-метод WithCancellation.

Когда это видит компилятор, он говорит: «Окей, у тебя есть этот метод, в таком случае, я возьму токен, который ты в него передаешь, и передам его в твой [EnumeratorCancellation]», и находит он этот аргумент именно благодаря этому атрибуту.

Когда мы обрабатываем await в методе ComplexCancellationMain, он говорит: «Окей, дай мне CancellableSlowRange, который не передает CancellationToken», потому что он не знает его, ведь обрабатываемый код не обязан знать о том, какой там таймаут. Об этом беспокоится другой метод.

Передавать контекст отмены от метода к методу нам как раз позволяет атрибут, описанный выше. CanselSlowRange() создает enumerable, который не отменяется. Поэтому было бы нелогично передавать ему CancellationToken. Мы просто говорим ему вернуть асинхронный enumerable, после чего await foreach в ConsumeSequenceTimeout использует асинхронный энумератор, и он уже может сказать использовать CancellationToken. В таких ситуациях, когда есть энумератор, который нужно отменить, этот атрибут играет очень важную роль.

Если понравился этот доклад с DotNext — обратите внимание, что 21 и 22 октября, состоится DotNext 2021 Moscow. На сайте уже есть описания многих докладов, а среди спикеров будут Скотт Влашин и Рафаэль Риальди, полюбившиеся зрителям предыдущих DotNext. Программа и билеты — на сайте.

Источник 📢