Введение

В данной статье, которая является второй из цикла об асинхронном программировании с использованием сопрограмм в Python, мы рассмотрим их классическую реализацию, доступную ещё с Python версии 2.5, при помощи расширенных возможностей генераторов (PEP 342).

 

Сопрограммы на основе генераторов Python


 

В Python есть механизм, который создан для удобного описания итераторов: генераторы (generators).

Функцией-генератором (generator function) называется функция, которая может отдавать очередное значение при помощи ключевого слова yield и автоматически сохраняет и возобновляет своё состояние при получении следующего значения. При вызове данная функция возвращает итератор, который называется итератором генератора (generator iterator) или объектом генератора (generator object). Под генератором в зависимости от контекста понимают либо функцию-генератор, либо итератор генератора.

Пример:

Примечание: исходный код всех примеров доступен на GitHub: https://github.com/aqrln/itvdn-blog-async-python-examples

 

def fibonacci():

    a, b = 0, 1

    while True:

        yield b

        a, b = b, a + b

 

Данный генератор представляет собой бесконечную последовательность чисел Фибоначчи.

Создадим объект генератора:

 

fibonacci_sequence = fibonacci()

 

После этого можно последовательно получать числа Фибоначчи путём вызова

 

print(next(fibonacci_sequence))

(Встроенная функция next вызывает метод __next__ (next в Python 2) объекта-генератора.)

 

Или, например, получим список пар номеров и значений первых n чисел Фибоначчи:

 

print(list(zip(range(1, n + 1), fibonacci())))

 

Генераторы являются частным случаем сопрограмм, также называемым semicoroutines (дословно – полусопрограммы). В отличие от классических сопрограмм, которые могут передавать управление в произвольную сопрограмму, генераторы могут передавать его лишь в место вызова метода __next__ (или send, но об этом позже), однако, для реализации асинхронных функций нам это и нужно: сопрограммы будут возвращать управление в цикл событий.

В Python 3 существуют так называемые подгенераторы (subgenerators). Если в функции-генераторе встречается пара ключевых слов yield from, после которых следует объект-генератор, то данный генератор делегирует доступ к подгенератору, пока он не завершится (не закончатся его значения), после чего продолжает своё исполнение. Кроме того, yield from на самом деле является не оператором, а выражением, результат которого равен тому значению, которое функция-генератор возвращает при помощи обычного оператора return (оно сохраняется в исключении StopIteration, которое возникает при завершении генератора).

Этого уже достаточно для того, чтобы реализовать асинхронное выполнение кода на основе цикла событий, и именно так реализован модуль asyncio в Python 3.4. Однако, для реализации сопрограмм в общем случае у генераторов в Python есть ещё один полезный метод: send(). Он отправляет в генератор значение, которому будет равно yield-выражение генератора (да, yield тоже является выражением, а не оператором). При помощи этих средств уже можно реализовать полноценные сопрограммы.

Давайте рассмотрим пример программы, в основе которой лежат две сопрограммы: одна генерирует данные (её в таком случае называют producer), а другая – обрабатывает (consumer).

Примечание: здесь и далее примеры кода написаны на Python 3

 

import time

import random

 

 

def sleep(seconds):

    """Сопрограмма, которая приостанавливает сопрограмму,

    из которой была вызвана, на заданное количество секунд"""

 

    initial_time = time.time()

    while time.time() - initial_time < seconds:

        yield

 

 

def consume():

    """Сопрограмма обработки данных"""

 

    running_sum = 0

    count = 0

 

    while True:

        data = yield

        running_sum += data

        count += 1

        print('Got data: {}\nTotal count: {}\nAverage: {}\n'.format(

            data, count, running_sum / count))

 

 

def produce(consumer):

    """Сопрограмма выдачи данных.

    Каждые полсекунды генерирует случайное число.

    """

 

    while True:

        yield from sleep(0.5)

        data = random.randint(0, 100)

        consumer.send(data)

        yield

 

 

def main():

    # Создание обработчика данных

    consumer = consume()

    Запуск сопрограммы

    consumer.send(None)

 

    # Создание производителя данных

    producer = produce(consumer)

 

    Цикл событий (event loop)

    while True:

        next(producer)

 

 

if __name__ == '__main__':

    main()

 

Здесь функция-генератор produce – это producer-сопрограмма, consume – consumer-сопрограмма, sleep вспомогательная сопрограмма, которая используются сопрограммой consume для приостановки выполнение своего «потока» на заданное время.

В начале работы программы создаются объекты-генераторы consumer и producer и запускается сопрограмма consumer. Её необходимо запустить заранее (либо послав её значение None, либо вызвав next(consumer), что, на самом деле, одно и то же), так как единственным значением, которое можно отправить в генератор, пока он не запущен, является None. Причина этого в том, что значение, которое передаётся при помощи метода send(), становится значением того yield-выражения, при помощи которого генератор вернул управление, и если он ещё не запущен, то данное значение некуда присваивать.

Затем запускается некоторое подобие цикла событий (с единственным возможным событием: генерация новой порции данных). Если бы в Python на уровне языка была поддержка полноценных сопрограмм с возможностью передачи управления в заданную сопрограмму, то можно было бы обойтись без него, но генераторы могут передавать управление лишь в вызывающую функцию, поэтому требуется сущность, которая управляет выполнением сопрограмм. Ей и является этот цикл.

Рассмотрим сопрограмму consume. После инициализации в бесконечном цикле она повторяет следующие действия: ожидание данных при помощи yield-выражения (и, поскольку yield отдаёт управление циклу событий, в то время, пока данная сопрограмма находится в состоянии ожидания, может выполняться другой код) и их обработка: увеличение счётчиков и вывод на экран полученного числа, общего количества полученных чисел и их среднего значения.

Внутри сопрограммы produce тоже находится бесконечный цикл. На каждой итерации она отдаёт управление сопрограмме sleep, которая просто отдаёт управление циклу событий через сопрограмму produce, пока не пройдёт заданный промежуток времени, а затем генерирует случайное число и отправляет его объекту-генератору consumer при помощи метода send().

Таким образом, мы реализовали параллельное исполнение двух функций внутри одного потока. Однако, в отличие от настоящей многопоточности, функции сами решают, когда им следует переключиться.

 

 Командная строка

 

Давайте сравним это с абсолютно аналогичной программой, построенной на классических потоках:

 

import time

import random

from threading import ThreadCondition

 

 

class Consumer(Thread):

    """Класс потока – обработчика данных"""

 

    def __init__(self):

        """Конструктор класса"""

 

        super().__init__()

        self.condition = Condition()

        self.received_data = None

 

    def run(self):

        """Код, исполняемый в потоке"""

 

        running_sum = 0

        count = 0

 

        while True:

            with self.condition:

                # Ожидание доступности данных

                self.condition.wait()

                Получение данных

                data = self.received_data

 

            Обработка данных

            running_sum += data

            count += 1

            print('Got data: {}\nTotal count: {}\nAverage: {}\n'.format(

                data, count, running_sum / count))

 

    def send(selfdata):

        """Метод отправки данных в обработчик"""

        self.received_data = data

 

 

class Producer(Thread):

    """Класс потока – производителя данных"""

 

    def __init__(selfconsumer):

        """Конструктор класса"""

 

        super().__init__()

        self.consumer = consumer

        self.condition = consumer.condition

 

    def run(self):

        """Код, выполняемый в данном потоке"""

 

        while True:

            data = random.randint(0, 100)

            with self.condition:

                self.consumer.send(data)

                self.condition.notify()

            time.sleep(0.5)

 

 

def main():

    # Создание и запуск обработчика данных

    consumer = Consumer()

    consumer.start()

 

    # Создание и запуск производителя данных

    producer = Producer(consumer)

    producer.start()

 

 

if __name__ == '__main__':

    main()

 

Как видите, в обоих случаях код очень похожий. Сопрограммы позволяют описывать логику работы максимально приближенно к многопоточному коду, однако, в данном случае решение с сопрограммами короче, элегантнее и не требует синхронизации потоков.