Сопрограммы в Python
Генераторы
Любой более-менее приличный программист на Python знает, что есть такая замечательная штука, как функции-генераторы. Главная их особенность — это сохранение состояния между вызовами.
Я подразумеваю что вы знаете как они работают, поэтому просто напомню, как это выглядит.
Возьмём вот такую функцию:
Эта функция принимает на вход имя файла и возвращает его строчка за строчкой, не загружая целиком в память, что может быть необходимо при чтении больших файлов.
Такой приём называют ленивым (lazy) чтением, подразумевая, что мы не делаем «работу» без необходимости.
В общем случае, работа с генераторами выглядит следующим образом:
Естественно, чаще мы читаем значения из генератора в цикле, а не построчно.
Вот таким нехитрым способом мы можем получить все уникальные строки из сколь угодно большого файла:
Так же возможна короткая запись генератора:
Похоже на списковые выражения, верно? Только не требует создания всего списка range(0, 100*10000) в памяти, возвращаемое значение «вычисляется» каждый раз при обращении.
Сопрограммы как частный случай генераторов
А теперь о том, ради чего это, собственно, затевалось. Оказывается, генератор может не только возвращать значения, но и принимать их на вход.
О стандарте можно почитать тут PEP 342.
Предлагаю сразу начать с примера. Напишем простую реализацию генератора, который может складывать два аргумента, хранить историю результатов и выводить историю.
Т.е. мы создали генератор, проинициализировали его и подаём ему входные данные.
Он, в свою очередь, эти данные обрабатывает и сохраняет своё состояние между вызовами до тех пор пока мы его не закрыли. После каждого вызова генератор возвращает управление туда, откуда его вызвали. Это важнейшее свойство генераторов мы и будем использовать.
Так, с тем, как это работает, вроде, разобрались.
Давайте теперь избавим себя от необходимости каждый раз руками инициализировать генератор.
Решим это типичным, для питона, образом, с помощью декоратора.
На этом примере можно понять как писать свои более сложные (и полезные) сопрограммы.
Заключение.
Хоть проблемы, которые можно решить этим инструментом затрагивают очень многие области (такие как асинхронное программирование), многие разработчики предпочитают более привычные инструменты ООП. Но при этом сопрограммы могут быть очень полезным инструментом в вашем арсенале, поскольку они достаточно наглядны, а создание фунций более дешёвая операция по сравнению с созданием объекта класса.
Да и определённый академический интерес они представляют, как мне кажется.
Вот такая вот первая статья.
Корутины и задачи¶
В этом разделе приведено высокоуровневое API asyncio для работы с корутинами и задачами.
Корутины¶
Заметим, что простой вызов корутины не приведёт к её выполнению:
Чтобы фактически запустить корутину, asyncio предоставляет три основных механизма:
Функция asyncio.run() для запуска функции точки входа верхнего уровня «main()» (см. приведённый пример выше)
Ожидающая корутина. Следующий фрагмент кода напечатает «hello» после ожидания в 1 секунду, а затем напечатает «world» после ожидания в течении ещё 2х секунд
Давайте изменим приведённый выше пример и запустим две say_after корутины конкурентно:
Обратите внимание, что ожидаемые выходные данные теперь показывают, что фрагмент выполняется на 1 секунду быстрее, чем ранее:
Ожидаемые объекты¶
Мы говорим, что объект является ожидаемым объектом, если его можно использовать в await выражении. Многие API-интерфейсы asyncio предназначены для приёма ожидаемых. Существует три основных типа ожидаемых объектов: корутины, задачи и футуры.
Python корутины являются ожидаемыми и поэтому могут ожидаться из других корутин:
В этой документации термин «корутина» может использоваться для двух тесно связанных понятий:
asyncio также поддерживает устаревшие основанные на генераторах корутины.
Задачи используются для конкурентного планирования корутин.
Когда объект Футуры ожидается, это означает, что корутина будет ждать, пока Футура будет решена в каком-то другом месте.
Объекты Футуры в asyncio нужны, чтобы позволить основанному на колбэках коду использоваться с async/await.
Обычно нет нужды создавать объекты Футуры на уровне кода приложения.
Объекты Футуры, иногда раскрываемые библиотеками и некоторыми asyncio API, могут быть ожидаемыми:
Запуск asyncio программы¶
Выполняет корутину coro и возвращает результат.
Функция управляет переданной корутиной, заботясь об управлении asyncio событийного цикла и завершения асинхронных генераторов.
Функция не может быть вызвана, когда в том же потоке выполняется другой asyncio событийный цикл.
Функция всегда создаёт новый событийный цикл и закрывает его в конце. Его следует использовать в качестве основной точки входа для asyncio программ, и в идеале его следует вызывать только один раз.
Добавлено в версии 3.7.
Исходный код asyncio.run() можно найти в Lib/asyncio/runners.py.
Создание задач¶
Обёртывание coro корутины в Task и запланировать её выполнение. Возвращает объект задачи.
Эта функция была добавлена в Python 3.7. Ранее Python 3.7 вместо неё можно использовать низкоуровневую функцию asyncio.ensure_future() :
Добавлено в версии 3.7.
Блокировка на delay секунд.
Если result предоставляется, он возвращается вызывающему после завершения корутины.
sleep() всегда приостанавливает выполнение текущей задачи, позволяя выполнять другие задачи.
Устарело с версии 3.8, будет удалено в 3.10 версии.: Параметр loop.
Пример корутины, отображающей текущую дату каждую секунду в течение 5 секунд:
Конкурентный запуск задач¶
Запускает ожидаемые объекты в последовательности aws конкурентно.
Если какой-либо ожидаемый объект в aws является корутиной, он автоматически назначается как задача.
Если все await объекты выполнены успешно, результатом является сводный список возвращенных значений. Порядок значений результата соответствует порядку await в aws.
При return_exceptions True исключения обрабатываются так же, как и успешные результаты, и агрегируются в списке результатов.
Если gather() отменён, все представленные ожидаемые (которые ещё не завершены) также будут отменены.
Если какая-либо задача или футура в последовательности aws отменена, это рассматривается, как будто сработало исключение CancelledError — вызов gather() не отменяется в этом случае. Это необходимо для предотвращения отмены одной отправленной задачи/футуры, чтобы привести к отмене других задач/футур.
Устарело с версии 3.8, будет удалено в 3.10 версии.: Параметр loop.
Если return_exceptions содержит значение False, отмена gather() после того, как он был помечен как выполненный, не отменит ни одного отправленного ожидаемого объекта. Например, gather может быть помечена как выполненная после передачи исключения вызывающей стороне, поэтому вызов gather.cancel() после перехвата исключения (вызванного одним из ожидаемых объектов) из gather не отменяет другие ожидаемые объекты.
Изменено в версии 3.7: Если gather отменяется, отмена распространяется независимо от return_exceptions.
Защита от отмены¶
Если aw корутина, она автоматически назначается как задача.
Если требуется полностью игнорировать отмену (не рекомендуется), функция shield() должна быть объединена с предложением try/except следующим образом:
Устарело с версии 3.8, будет удалено в 3.10 версии.: Параметр loop.
Таймауты¶
Дождаться завершения aw ожидаемого с таймаутом.
Если aw корутина, она автоматически назначается как задача.
Функция будет ждать, пока футура будет фактически отменена, поэтому общее время ожидания может превысить timeout.
Если ожидание отменяется, то также отменяется и будущий aw.
Устарело с версии 3.8, будет удалено в 3.10 версии.: Параметр loop.
Примитивы ожидания¶
Конкурентный запуск ожидаемых объектов в итерации aws и блокировка, пока не будет выполнено условие, указанное в return_when.
timeout (float или int), если он указан, можно использовать для управления максимальным количеством секунд ожидания перед возвращением.
return_when указывает, когда функция должна возвращать. Она должна быть одной из следующих констант:
Устарело с версии 3.8, будет удалено в 3.10 версии.: Параметр loop.
Вот как можно зафиксировать вышеуказанный фрагмент:
Не рекомендуется, начиная с версии 3.8: Передача корутиновых объектов непосредственно в wait() устарела.
Запуск ожидаемых объектов в aws итеративно и конкурентно. Возвращает итератор корутины. Каждую возвращенную корутину можно ожидать, чтобы получить самый ранний следующий результат от итерируемого из оставшихся ожидаемых.
Устарело с версии 3.8, будет удалено в 3.10 версии.: Параметр loop.
Планирование из других потоков¶
Отправить корутину в событийный цикл. Потокобезопасный.
Возвращает concurrent.futures.Future дожидаясь результата из другого потока ОС.
Эта функция вызывается из потока операционной системы, отличного от того, где выполняется событийный цикл. Пример:
Если в корутине возникает исключение, возвращенная футура будет уведомлена. Также можно использовать отмену задачи в событийном цикле:
В отличие от других asyncio функций эта функция требует явной передачи loop аргумента.
Добавлено в версии 3.5.1.
Интроспекция¶
Добавлено в версии 3.7.
Добавлено в версии 3.7.
Объект задачи¶
Задачи используются для запуска корутин в событийных циклах. Если корутина ожидает футуры, задача приостанавливает выполнение корутины и ожидает завершения футуры. Когда выполнено, футура возобновляет исполнение обёрнутой корутины.
Событийный цикл использует совместное планирование: событийный цикл выполняет одну задачу одновременно. В то время как задача ожидает для завершения футуры, событийный цикл выполняет другие задачи, колбэки или выполняет операции ввода- вывода.
Устарело с версии 3.8, будет удалено в 3.10 версии.: Параметр loop.
Запрос отмены задачи.
Это позволяет создать CancelledError исключение для обернутой корутины на следующем цикле событийного цикла.
В следующем примере показано, как корутины могут перехватывать запрос на отмену:
Задача отменена, когда запрашивалась отмена с cancel() и обернутая корутина распространила в неё CancelledError исключение.
Задача завершена, когда обернутая корутина либо возвратила значение, либо вызвала исключение, либо задача была отменена.
Возвращает результат выполнения задачи.
Если задача является завершенной, то результатом обернутой корутины является возвращаемое (или если корутина вызвала исключение, то это исключение возникает повторно.)
Если задача была отменена, это метод вызывает CancelledError исключение.
Если результат задачи ещё не доступен, это метод вызывает InvalidStateError исключение.
Возвращает исключение задачи.
Если задача была отменена, это метод вызывает CancelledError исключение.
Если задача еще не завершена, это метод вызывает InvalidStateError исключение.
add_done_callback ( callback, *, context=None ) ¶
Добавление колбэка для выполнения при выполнении задачи.
Этот метод должен быть использован только в низкоуровневом основанном на колбэках коде.
Удалить callback из списка колбэков.
Этот метод должен быть использован только в низкоуровневом основанном на колбэках коде.
Возвращает список фреймов стека для этой задачи.
Если обёрнутая корутина не выполнена, он возвращает стек, где он приостановлен. Если короутина успешно завершена или отменена, возвращает пустой список. Если корутина была прервана исключением, возвращает список кадров трейсбэка.
Фреймы всегда упорядочиваются от самых старых до самых новых.
Для приостановленной корутины возвращается только одни фрейм стека.
Необязательный аргумент limit устанавливает максимальное количество возвращаемых кадров; по умолчанию возвращаются все доступные фреймы. Порядок возвращаемого списка различается в зависимости от того, возвращается ли стек или трассировка: возвращаются самые новые фреймы стека, но возвращаются самые старые фреймы трассировки. (Это соответствует поведению модуля трассировки.)
Печать стека или трейсбэка для этой задачи.
Добавлено в версии 3.8.
Возвращает имя задачи.
Если ни одно имя не было явно назначено задаче, реализация задачи asyncio по умолчанию создаёт имя по умолчанию во время создания экземпляра.
Добавлено в версии 3.8.
Задание имя задачи.
Аргументом value может быть любой объект, который затем преобразуется в строку.
В реализации задачи по умолчанию имя будет отображаться в repr() выходных данных объекта задачи.
Добавлено в версии 3.8.
Возвращает множество всех задач для событийного цикла.
Основанные на генераторах корутины¶
Поддержка основанных на генераторах корутин запрещено и планируется к удалению в Python 3.10.
Корутины на основе генератора предшествовали синтаксису async/await. Они представляют собой Python генераторы, которые используют yield from выражения для ожидания футур и других корутин.
Декоратор для маркировки основанных на генераторах корутин.
Этот декоратор обеспечивает совместимость устаревших основанных на генераторах корутин с async/await кодом:
Этот декоратор не должен использоваться для async def корутин.
Устарело с версии 3.8, будет удалено в 3.10 версии.: Используйте async def вместо этого.
Метод отличается от inspect.iscoroutine() потому что возвращает True для основанных на генераторах корутин.
asyncio. iscoroutinefunction ( func ) ¶
Паттерны корутин asyncio: за пределами await
Предисловие переводчика:
В очередной раз наступив на грабли при работе с python asyncio я отправился на просторы интернета, чтобы найти что-то более приятное, чем сухая документация. Мне попалась статья Yeray Diaz «Asyncio Coroutine Patterns: Beyond await», в которой автор весьма увлекательно рассматривает применение asyncio и делится некоторыми приемами. Поскольку я не нашел ничего такого же цельного на русском языке, то решился её перевести.
Asyncio — конкурентная мечта python программиста: пишешь код, граничащий с синхронным, и позволяешь Python сделать все остальное. Это очередной импорт библиотеки антигравитации: import antigravity
На самом деле все совсем не так, конкурентное программирование — тяжелое занятие и, пока корутины позволяют нам избегать ада обратных вызовов, что может увести вас достаточно далеко, вам все еще нужно думать о создании задач, получении результатов и элегантном перехвате исключений. Печально.
Хорошие новости в том, что все из этого возможно в asyncio. Плохие новости в том, что не всегда сразу очевидно что неправильно и как это исправить. Ниже несколько паттернов, которые я обнаружил во время работы с asyncio.
Прежде чем мы начнем:
Я использовал любимую библиотеку aiohttp для выполнения асинхронных HTTP запросов и Hacker News API, потому что это простой и хорошо известный сайт, который придерживается знакомого сценария использования. Следуя отклику на мою предыдущую статью, я также использовал async/await синтаксис введенный в Python 3.5. Я предполагал, что читатель знаком с идеями, которые здесь описаны. И в конечном счете, все примеры доступны в GitHub репозитории этой статьи.
Хорошо, давайте начнем!
Рекурсивные корутины
Создание и запуск задач тривиально в asyncio. Для таких задач API включает несколько методов в классе AbstractEventLoop, а также функции в библиотеке. Но обычно вы хотите комбинировать результаты от этих задач и обрабатывать их каким-то образом, и рекурсия — это отличный пример данной схемы, а также демонстрирует простоту корутин в сравнении с остальными средствами конкурентности.
Обычный случай для использования asyncio — это создание вебкраулера какого-то вида. Представьте, что мы просто слишком заняты чтобы проверять HackerNews, или может быть вам просто нравится хороший холивар, так что вы хотите реализовать систему, которая извлекает число комментариев для определенного постна HN и, если оно выше порога, уведомляет вас. Вы немного погуглили и нашли документацию на HN API, просто то что нужно, однако вы заметили в документации следующее:
Хотите узнать общее число комментариев статьи? Обойдите дерево и сосчитайте их.
Не стесняйтесь попробовать запустить скрипт с флагом “ —verbose” для более детального вывода.
Давайте пропустим шаблонный код и перейдем прямо к рекурсивной корутине. Заметим, что этот код читается практически полностью так, как было бы в случае с синхронным кодом.
Это отличный пример того, что Бретт Слаткин описывает как fan-in и fan-out, мы fan-out чтобы получить данные от наследников и fan-in сводим полученные данные, чтобы расчитать число комментариев
В API asyncio есть пару способов для того, чтобы выполнять эти fan-out операции. Здесь я использую функцию gather, которая эффективно ожидает пока все корутины выполнятся и вернут список своих результатов.
Обратим внимание на то, как использование корутины также хорошо соответствует рекурсии с одной любой точкой, в которой присутствует любое число корутин, ожидающих ответы на свои запросы во время вызова функции gather и возобновляющих выполнение после того, как операция ввода/вывода завершится. Это позволяет нам выразить довольно сложное поведение в одной изящной и (легко) читаемой корутине.
«Очень просто» — скажете вы? Ладно, давайте поднимемся на ступеньку выше.
Выстрелил и забыл
Представьте, что вы хотите отправлять себе сообщение на электронную почту с постами, у которых количество комментариев больше определенного значения, и вы хотите сделать это таким же способом как мы обходили дерево постов. Мы можем просто добавить выражение «if» в конец рекурсивной функции чтобы добиться этого:
Да, я использовал asyncio.sleep. Это в последний раз. Обещаю.
Это значительно медленнее чем раньше!
Причина в том, что, как мы обсуждали ранее, await приостанавливает выполнение корутины до тех пор, пока future не будет выполнена, но, поскольку нам не нужен результат логирования, нет реальной причины поступать таким образом.
Нам нужно «выстрелить и забыть» нашей корутиной, а поскольку мы не можем ждать ее завершения используя await, нам нужен другой путь для запуска выполнения корутины без ее ожидания. Быстро взглянув на API asyncio найдем функцию ensure_future, которая запланирует запуск корутины, обернет её в объект Task и вернёт. Помня, что раньше корутина была запланирована, цикл событий будет контролировать результат работы нашей корутины в какой-то момент в будущем, когда другая корутина будет в состоянии ожидания.
Здорово, давайте заменим await log_post следующим образом:
Проблема в том, что мы принудительно закрываем цикл событий после того, как получаем результат работы корутины post_number_of_comments, не оставляя нашей задаче log_post времени для завершения.
У нас есть две возможности:
мы либо позволяем циклу событий работать бесконечно, используя run_forever, и вручную прерываем работу скрипта, или мы используем метод all_tasks класса Task для того, чтобы найти все работающие задачи и ждем когда закончится расчет количества комментариев.
Давайте попробуем выйти из этой ситуации быстро внеся изменения после нашего вызова post_number_of_comments:
Сейчас мы уверены, что логирующие задачи завершены.
Предположение, что метод all_tasks отлично работает в случаях, с которыми мы имеем дело, — отличная идея, когда задачи выполняются соответствующим образом в нашем цикле событий, но в более сложных случаях может быть любое количество выполняющихся задач, источник которых может находится за пределами нашего кода.
Другой подход — навести порядок после того, как мы самостоятельно регистрируем абсолютно все корутины, которые мы планировали запустить и позволяем выполнится, отложенным ранее,
как только завершится подсчет комментариев. Как вы знаете функция ensure_future возвращает объект Task, Мы можем использовать это для регистрации наших задач с низким приоритетом. Давайте просто определим список task_registry и сохраним в нем futures:
Мы извлекаем следующий урок — asyncio не следует рассматривать как распределенную очередь задач типа Celery. Все задачи запускаются в одном потоке и циклом событий необходимо управлять соответственно позволяя выделить время для завершения задач.
Что приводит к другому общепринятому паттерну:
Периодически запускаемые корутины
Продолжая с нашим примером о HN(и мы ранее провели отличную работу), мы решили,
что решительно важно рассчитывать число комментариев к публикации HN как только они становятся доступны и пока они находятся в списке 5 последних записей.
Быстрый взгляд на API HN показывает конечную точку, которая возвращает 500 последних записей. Отлично, так мы можем просто опрашивать эту конечную точку для получения новых публикаций и расчета числа комментариев к ним, скажем раз в пять секунд.
Хорошо, поскольку теперь мы переходим к периодическому опросу, мы можем просто использовать бесконечный while цикл, ожидать выполнения задачи опроса (вызывать await), и засыпать(вызывать sleep) на необходимый промежуток времени. Я внес несколько незначительных изменений чтобы получить топ записей вместо обращения по непосредственному URL поста.
Отлично, но есть незначительная проблема: если вы обратили внимание на временную отметку,
то задача не запускается строго раз в 5 секунд, она запускается через 5 секунд после того как завершится выполнение get_comments_of_top_stories. Снова последствия использования await и блокирование пока мы не получим наши результаты обратно.
Эти особенности не создают проблемы в случае, когда задаче требуется больше времени чем пять секунд. Также, кажется ошибочным использовать _run_untilcomplete когда корутина спроектирована как бесконечная.
Хорошие новости в том, что теперь мы эксперты по ensure_future, и можем просто впихнуть ее в код вместо использования await.
Кхм… Ладно, хорошие новости в том, что временная отметка располагается точно через пять секунд, но что за 0.00 секунд и нет выборок И затем следующая итерация занимает ноль секунд и 260 выборок?
Это одно из последствий ухода от await, теперь мы больше не блокируем корутину и просто переходим на следующую строку, которая печатает ноль секунд и, в первый раз, ноль извлеченных сообщений. Это довольно мелкие задачи, поскольку мы можем жить без сообщений, но что если нам нужны результаты выполнения задач?
Тогда, мой друг, нам нужно прибегнуть к… callback-ам (поеживаемся((( )
Я знаю, знаю, весь смысл корутин в том, чтобы избежать callback — ов, но это потому, что драматический подзаголовок статьи — «За пределами await». Мы больше не на территории await, у нас приключения с ручным запуском задач что приводит к нашему сценарию использования. Что это вам дает? spoiler
Как мы обсуждали ранее ensure_future возвращает объект Future, к которому мы мы можем добавить callback используя _add_donecallback.
Прежде чем мы это сделаем, и чтобы иметь корректный подсчет выборок(fetches) мы приходим к тому, что мы должны инкапсулировать нашу корутину извлечения в класс URLFetcher. В таком случае создаем экземпляр для каждой задачи, чтобы у нас был корректный подсчет выборок. Также удаляем глобальную переменную, которая все равно вносила баг:
Хорошо, уже лучше, но давайте сфокусируемся на секции callback:
Обратим внимание на то, что callback функции необходимо принимать один аргумент, в котором передается сам объект future. Мы также возвращаем количество выборок(fetch) из экземпляра URLFetcher как результат _get_comments_of_topstories и получаем эти данные как результат future.
Видите? Я говорил вам, что это будет неплохо, зато здесь точно нет await.
Пока мы обсуждаем callback-и, при неизбежных скитаниях по документации API asyncio вам посчастливилось найти пару методов в AbstractBaseLoop с именами _calllater и _callat,
которые выглядят как что-то удобное для реализации периодической корутины. И вы будете правы, их можно использовать, нам просто нужно внести изменения в poll_top_stories_for_comments:
Результаты аналогичны полученным ранее. Обратим внимание на несколько изменений:
Ладно, это все хорошо и даже превосходно, но, что если Бог недоступен — наше соединение разорвалось посередине выполнения задачи? Что произойдет с нашей замечательной системой? Давайте проведем такую симуляцию добавив выброс исключения после нескольких запросов URL:
Не так здорово, правда?
Что делать? Переходить к следующей части этой серии, где я исследую возможности, которые у нас есть для обработки ошибок и остальных задач: Паттерны asyncio корутин: Ошибки и Отмена



