Coroutines в PHP и работа с неблокирующими функциями

Одним из самых больших нововведений в PHP 5.5 будет поддержка генераторов и корутин (сопрограмм). Генераторы уже достаточно были освещены в документации и в нескольких других постах (например в этом или в этом). Сопрограммы же получили очень мало внимания. Это гораздо более мощный, но и более сложный для понимания и объяснения, инструмент.

В этой статье я покажу как реализовать планировщик задач с использованием корутин, чтобы вы поняли, что с ними можно делать и как их применять. Начнем с нескольких вступительных слов. Если вы считаете, что вы уже достаточно хорошо знаете как работают генераторы и корутины, тогда можете сразу перейти к разделу «Совместная многозадачность».

Генераторы

Суть генератора в том, что это функция, которая возвращает не просто одно значение, а последовательность значений, где каждое значение выброшено одно за другим. Или, другими словами, генераторы позволяют вам реализовать итератор, без лишнего кода.

Очень простым примером может послужить функция xrange():

function xrange($start, $end, $step = 1) {
    for ($i = $start; $i <= $end; $i += $step) {
        yield $i;
    }
}
foreach (xrange(1, 1000000) as $num) {
    echo $num, "\n";
}

Эта функция делает то же, что и встроенная в PHP, range(). Единственная разница заключается в том, что range() вернула бы вам массив из миллиона чисел, а xrange() вернет итератор, который будет выбрасывать эти числа, но никогда не будет создавать массив со всеми ними.

Преимущества такого подхода должны быть очевидны. Таким образом вы можете работать с очень большими наборами данных, не загружая их всех в память. Вы даже можете работать с бесконечнымипотоками данных.

Все это также может быть сделано без генераторов, создав класс имплементирующий интерфейс Iterator. Генераторы просто делают это (гораздо) более простым занятием, потому что вам не нужно имплементировать пять разных методов для каждого итератора.

Генераторы как прерываемые функции

Чтобы перейти от генераторов к корутинам, важно понять, как они устроены изнутри: генераторы — это прерываемые функции, в которых для прерываний служит ключевое слово yield.

Возвращаясь к предыдущему примеру, когда вы вызываете xrange(1, 1000000) — ничего из тела функции xrange на самом деле не будет вызвано. Вместо этого, PHP просто возвратит объект класса Generator, который имплементирует интерфейс Iterator:

<?php
$range = xrange(1, 1000000);
var_dump($range); // object(Generator)#1
var_dump($range instanceof Iterator); // bool(true)

Код будет запущен только тогда, когда вы запустите определенные методы итератора. Например если вы вызовете $range->rewind() код функции xrange() будет выполнен до первого yield. В этом случае это значит, что сначала будет выполнен $i = $start и затем yield $i. Что бы мы ни передали в yield, это можно будет получить вызовом $range->current().

Чтобы продолжить исполнение кода, вы должны вызвать $range->next(). Это опять заставит генератор исполнить код до следующего yield. Таким образом, используя последовательные вызовы next() и current(), вы можете получить все значения из генератора, пока он не достигнет той точки, когда код просто закончится. В случае с xrange() это произойдет, когда $i достигнет $end. В этом случае поток исполнения достигнет конца функции, не оставив больше кода. После того, как это произойдет, valid() будет возвращать false и итерирование прекратится.

Сопрограммы

Главное, что корутины добавляют к вышеописанной функциональности — это возможность отправлять значения генератору. Это делает из привычного монолога генератора полноценный диалог, где вызывающая сторона может также что-то сообщать генератору.

Значения передаются корутине вызовом метода send() вместо next(). Примером того, как это работает, послужит вот эта корутина:

function logger($fileName) {
    $fileHandle = fopen($fileName, 'a');
    while (true) {
        fwrite($fileHandle, yield . "\n");
    }
}
$logger = logger(__DIR__ . '/log');
$logger->send('Foo');
$logger->send('Bar');

Как вы видите, здесь yield не как statement (как например return или echo), а как выражение, то есть он возвращает какое-то значение. Он возвратит то, что было послано через send(). В данном примере yield сначала возвратит "Foo", а потом "Bar".

В этом примере было представлено как yield может выступать в качестве простого получателя. Но вы также можете комбинировать оба типа использования, таким образом вы сможете и посылать и получать значения. Вот пример того, как это работает:

function gen() {
    $ret = (yield 'yield1');
    var_dump($ret);
    $ret = (yield 'yield2');
    var_dump($ret);
}
$gen = gen();
var_dump($gen->current());    // string(6) "yield1"
var_dump($gen->send('ret1')); // string(4) "ret1"   (the first var_dump in gen)
                              // string(6) "yield2" (the var_dump of the ->send() return value)
var_dump($gen->send('ret2')); // string(4) "ret2"   (again from within gen)
                              // NULL               (the return value of ->send())

Точный порядок вывода может показаться немного сложным для понимания на первый взгляд, так что перечитайте код и попробуйте запустить сами, чтобы разобраться почему все происходит именно в таком порядке. Здесь есть две вещи, на которые я бы хотел обратить ваше внимание: первое, использование скобок вокруг yield — это не случайность. Эти скобки нужны нам по техническим причинам (я даже подумывал добавить выброс исключения на прямое присваивание). Во-вторых, вы должно быть заметили, что current() был использован без вызова rewind()rewind(), на самом деле, в таком случае вызывается неявно.

Совместная многозадачность

Если читая пример функции logger(), вы подумали «Зачем я буду использовать для этого корутину? Почему бы не сделать для этого обычный класс?», тогда вы были абсолютно правы. Тот пример лишь демонстрирует, как этим можно пользоваться, но в нем нет никаких причин, чтобы использовать корутины. Как было сказано выше, во введении, корутины — это очень мощная штука, но их применение очень редко и часто сильно усложнено, что делает задачу придумать простые и не надуманные примеры довольно сложной.

Я решил показать вам реализацию совместной мультизадачности, используя корутины. Суть в том, что у нас есть несколько задач, которые надо запустить параллельно. Но процессор (прим. пер. сферический и в вакууме) может исполнять лишь одну задачу в один момент времени. Таким образом, процессору нужно переключаться между разными задачами и давать каждой «немного поработать».

«Совместная» эта многозадачность потому, что она подразумевает добровольную передачу контроля исполнения планировщику, чтобы тот мог запустить другую задачу. Есть также вытесняющая многозадачность, где планировщик сам может прервать задачу. Совместная многозадачность использовалась в ранних версиях Windows (до Win95) и Mac OS, но потом они переключились на вытесняющую. Причина очевидна — если вы полагаетесь на какую-либо программу, чтобы она добровольно отдала поток управления, то любая программа может просто оккупировать весь CPU.

Сейчас вы уже должны видеть связь между корутинами и планировщиком задач: yield дает возможность прерываться задаче самостоятельно, чтобы отдать поток управления планировщику и он мог бы запустить другую задачу. Помимо этого, yield может быть использован для коммуникации задачи с планировщиком.

В нашем случае, задача будет тонкой оберткой вокруг функции-генератора:

class Task {
    protected $taskId;
    protected $coroutine;
    protected $sendValue = null;
    protected $beforeFirstYield = true;
    public function __construct($taskId, Generator $coroutine) {
        $this->taskId = $taskId;
        $this->coroutine = $coroutine;
    }
    public function getTaskId() {
        return $this->taskId;
    }
    public function setSendValue($sendValue) {
        $this->sendValue = $sendValue;
    }
    public function run() {
        if ($this->beforeFirstYield) {
            $this->beforeFirstYield = false;
            return $this->coroutine->current();
        } else {
            $retval = $this->coroutine->send($this->sendValue);
            $this->sendValue = null;
            return $retval;
        }
    }
    public function isFinished() {
        return !$this->coroutine->valid();
    }
}

У каждой задачи будет свой идентификатор (taskId). Используя метод setSendValue() вы можете указать, какое значение послать задаче на следующем запуске (зачем это нужно узнаете позже). Метод run() на самом деле всего лишь вызывает метод send() корутины.
Чтобы понять, зачем нам нужен дополнительный флаг beforeFirstYield, посмотрите на следующий код:

function gen() {
    yield 'foo';
    yield 'bar';
}
$gen = gen();
var_dump($gen->send('something'));
// Когда вызывается send(), перед первым yield произойдет неявный вызов rewind()
// Вот что на самом деле будет происходить:
$gen->rewind();
var_dump($gen->send('something'));
// rewind() перейдет к первому yield (и опустит его значение), send()
// перейдет ко второму yield (и var_dump-нет его значение).
// Таким образом мы потеряли первое значение

С помощью beforeFirstYield мы будем знать, был ли уже возвращен первый yield.

Планировщику теперь придется сделать чуть больше, чем просто пройтись по всем таскам:

class Scheduler {
    protected $maxTaskId = 0;
    protected $taskMap = []; // taskId => task
    protected $taskQueue;
    public function __construct() {
        $this->taskQueue = new SplQueue();
    }
    public function newTask(Generator $coroutine) {
        $tid = ++$this->maxTaskId;
        $task = new Task($tid, $coroutine);
        $this->taskMap[$tid] = $task;
        $this->schedule($task);
        return $tid;
    }
    public function schedule(Task $task) {
        $this->taskQueue->enqueue($task);
    }
    public function run() {
        while (!$this->taskQueue->isEmpty()) {
            $task = $this->taskQueue->dequeue();
            $task->run();
            if ($task->isFinished()) {
                unset($this->taskMap[$task->getTaskId()]);
            } else {
                $this->schedule($task);
            }
        }
    }
}

Метод newTask() создает новую задачу и кладет ее в taskMap. Более того, он добавляет ее в очередь taskQueue. Метод run() потом проходит по этой очереди и запускает задачи. Если задача завершена — она удаляется, иначе добавляется в конец очереди.

Давайте попробуем планировщик с двумя простыми (и довольно бессмысленными) задачами:

function task1() {
    for ($i = 1; $i <= 10; ++$i) {
        echo "This is task 1 iteration $i.\n";
        yield;
    }
}
function task2() {
    for ($i = 1; $i <= 5; ++$i) {
        echo "This is task 2 iteration $i.\n";
        yield;
    }
}
$scheduler = new Scheduler;
$scheduler->newTask(task1());
$scheduler->newTask(task2());
$scheduler->run();

Обе задачи просто выводят сообщение, и отдают поток управления назад планировщику. Вот что будет выведено:

This is task 1 iteration 1.
This is task 2 iteration 1.
This is task 1 iteration 2.
This is task 2 iteration 2.
This is task 1 iteration 3.
This is task 2 iteration 3.
This is task 1 iteration 4.
This is task 2 iteration 4.
This is task 1 iteration 5.
This is task 2 iteration 5.
This is task 1 iteration 6.
This is task 1 iteration 7.
This is task 1 iteration 8.
This is task 1 iteration 9.
This is task 1 iteration 10.

 

Взаимодействие с планировщиком

Итак, наш планировщик работает и мы можем перейти к следующему пункту в повестке дня: взаимодействие между задачами и планировщиком. Мы будем использовать тот же способ, что используют процессы чтобы общаться с операционной системой: системные вызовы (syscall, сискол). Нам нужны сисколы, так как операционная система находится на другом уровне привилегей, нежели сами процессы. Поэтому, чтобы делать какие-то действия, требующие больших привилегий (например убивать другие процессы) должен быть способ отдать контроль ядру, чтобы оно могло произвести те действия. Изнутри это реализовано с помощью прерывающих инструкций.

Наш планировщик будет придерживаться этой архитектуры: вместо того, чтобы передавать планировщик задаче (которая может с ним сделать все что угодно), мы будем взаимодействовать с помощью сисколов, переданных через yield. В данном случае, yield будет выступать и как прерыватель и как способ передать информацию (и получить) планировщику.

Чтобы представить сискол, я буду использовать небольшую обертку над callable:

class SystemCall {
    protected $callback;
    public function __construct(callable $callback) {
        $this->callback = $callback;
    }
    public function __invoke(Task $task, Scheduler $scheduler) {
        $callback = $this->callback; // Can't call it directly in PHP :/
        return $callback($task, $scheduler);
    }
}

Она будет вести себя, как любая другая callable, но будет принимать в аргументы задачу и планировщик. Чтобы работать с этим, нам надо немного модифицировать метод run() планировщика:

public function run() {
    while (!$this->taskQueue->isEmpty()) {
        $task = $this->taskQueue->dequeue();
        $retval = $task->run();
        if ($retval instanceof SystemCall) {
            $retval($task, $this);
            continue;
        }
        if ($task->isFinished()) {
            unset($this->taskMap[$task->getTaskId()]);
        } else {
            $this->schedule($task);
        }
    }
}

Наш первый сискол просто вернет идентификатор задачи:

function getTaskId() {
    return new SystemCall(function(Task $task, Scheduler $scheduler) {
        $task->setSendValue($task->getTaskId());
        $scheduler->schedule($task);
    });
}

Это происходят потому, что мы ставим значение для отправки и ставим задачу обратно в планировщик. Для сисколов планировщик не добавляет задачи в очередь автоматически, нам нужно делать это вручную (вы узнаете почему чуть позже).
Используя этот новый сискол, мы можем переписать предыдущий пример:

function task($max) {
    $tid = (yield getTaskId()); // <-- here's the syscall!
    for ($i = 1; $i <= $max; ++$i) {
        echo "This is task $tid iteration $i.\n";
        yield;
    }
}
$scheduler = new Scheduler;
$scheduler->newTask(task(10));
$scheduler->newTask(task(5));
$scheduler->run();

Этот код даст нам тот же вывод, что и предыдущий пример. Еще пара сисколов, для создания новых и убивания задач:

function newTask(Generator $coroutine) {
    return new SystemCall(
        function(Task $task, Scheduler $scheduler) use ($coroutine) {
            $task->setSendValue($scheduler->newTask($coroutine));
            $scheduler->schedule($task);
        }
    );
}
function killTask($tid) {
    return new SystemCall(
        function(Task $task, Scheduler $scheduler) use ($tid) {
            $task->setSendValue($scheduler->killTask($tid));
            $scheduler->schedule($task);
        }
    );
}

Для функции killTask нам понадобится дополнительный метод в планировщкие:

public function killTask($tid) {
    if (!isset($this->taskMap[$tid])) {
        return false;
    }
    unset($this->taskMap[$tid]);
    // Это немного криво и можно было бы оптимизировать,
    // чтобы не итерироваться по всей очереди, но я оставлю это на потом
    foreach ($this->taskQueue as $i => $task) {
        if ($task->getTaskId() === $tid) {
            unset($this->taskQueue[$i]);
            break;
        }
    }
    return true;
}

Маленький скрипт для теста этой функциональности:

function childTask() {
    $tid = (yield getTaskId());
    while (true) {
        echo "Child task $tid still alive!\n";
        yield;
    }
}
function task() {
    $tid = (yield getTaskId());
    $childTid = (yield newTask(childTask()));
    for ($i = 1; $i <= 6; ++$i) {
        echo "Parent task $tid iteration $i.\n";
        yield;
        if ($i == 3) yield killTask($childTid);
    }
}
$scheduler = new Scheduler;
$scheduler->newTask(task());
$scheduler->run();

Вывод будет следующим:

Parent task 1 iteration 1.
Child task 2 still alive!
Parent task 1 iteration 2.
Child task 2 still alive!
Parent task 1 iteration 3.
Child task 2 still alive!
Parent task 1 iteration 4.
Parent task 1 iteration 5.
Parent task 1 iteration 6.

Дочерняя задача убивается после трех итераций, тогда и завершаются сообщения «Child task still alive!». Стоит отметить, что это не настоящие родитель и ребенок, так как дочерняя задача может продолжать исполняться даже когда родительская уже завершилась. Или дочерняя может даже убить родительскую. Мы бы могли модифицировать планировщик, чтобы получить нормальное отношение между родителем и ребенком, но не в этой статье.

Есть еще довольно много разных типов вызовов, которые можно реализовать, например wait (ждать пока задача не завершится полностью), exec (который задает, какую задачу сейчас нужно исполнить) и fork (создание клона задачи). Клонирование довольная классная фича и вы можете ее имплементировать с корутинами, так как они поддерживают клонирование.

Неблокирующее взаимодействие

Действительно классное применения нашего планировщика очевидно — веб-сервер. Может быть одна задача, слушающая сокет на предмет наличия новых соединений и каждый раз как новое соединение сделано, будет создана новая задача для обработки этого соединения.

Сложность заключается в том, что операции на сокетах, как например чтение данных в PHP блокирующие, то есть PHP будет ждать пока клиент не завершит отправку данных. Для веб-сервера это, очевидно, совсем не хорошо: это означает, что он сможет обрабатывать лишь один запрос в момент времени.

В качестве решения нам надо спрашивать у сокета, «готов» ли он, перед тем как читать или писать данные в него. Чтобы узнать, какие сокеты готовы для передачи или получения данных, мы будем использовать функцию stream_select().

Для начала давайте добавим пару новых сисколов, которые будут отправлять определенный сокет на ожидание либо чтения, либо записи:

function waitForRead($socket) {
    return new SystemCall(
        function(Task $task, Scheduler $scheduler) use ($socket) {
            $scheduler->waitForRead($socket, $task);
        }
    );
}
function waitForWrite($socket) {
    return new SystemCall(
        function(Task $task, Scheduler $scheduler) use ($socket) {
            $scheduler->waitForWrite($socket, $task);
        }
    );
}

Эти сисколы просто прокси для соответствующих методов планировщика:

<?php
// resourceID => [socket, tasks]
protected $waitingForRead = [];
protected $waitingForWrite = [];
public function waitForRead($socket, Task $task) {
    if (isset($this->waitingForRead[(int) $socket])) {
        $this->waitingForRead[(int) $socket][1][] = $task;
    } else {
        $this->waitingForRead[(int) $socket] = [$socket, [$task]];
    }
}
public function waitForWrite($socket, Task $task) {
    if (isset($this->waitingForWrite[(int) $socket])) {
        $this->waitingForWrite[(int) $socket][1][] = $task;
    } else {
        $this->waitingForWrite[(int) $socket] = [$socket, [$task]];
    }
}

Поля waitingForRead и waitingForWrite просто массивы содержащие ожидающие сокеты и относящиеся к ним задачи. Самая интересная часть это этот метод, который проверяет готовы ли сокеты и перепланирует их таски:

protected function ioPoll($timeout) {
    $rSocks = [];
    foreach ($this->waitingForRead as list($socket)) {
        $rSocks[] = $socket;
    }
    $wSocks = [];
    foreach ($this->waitingForWrite as list($socket)) {
        $wSocks[] = $socket;
    }
    $eSocks = []; // dummy
    if (!stream_select($rSocks, $wSocks, $eSocks, $timeout)) {
        return;
    }
    foreach ($rSocks as $socket) {
        list(, $tasks) = $this->waitingForRead[(int) $socket];
        unset($this->waitingForRead[(int) $socket]);
        foreach ($tasks as $task) {
            $this->schedule($task);
        }
    }
    foreach ($wSocks as $socket) {
        list(, $tasks) = $this->waitingForWrite[(int) $socket];
        unset($this->waitingForWrite[(int) $socket]);
        foreach ($tasks as $task) {
            $this->schedule($task);
        }
    }
}

Функция stream_select берет на вход массивы сокетов ожидающих чтения, записи и исключения (проигнорируем последний). Массивы передаются по ссылке и эта функция оставит в них лишь те элементы, состояние которых изменилось. После чего мы можем пройтись по всем этим массивам и перепланировать все задачи связанные с данными сокетами.

Чтобы произвести все эти действия, мы добавим в планировщик следующий метод:

protected function ioPollTask() {
    while (true) {
        if ($this->taskQueue->isEmpty()) {
            $this->ioPoll(null);
        } else {
            $this->ioPoll(0);
        }
        yield;
    }
}

Эту задачу надо зарегистрировать в какой-то момент, например вы можете добавить $this->newTask($this->ioPollTask()) в начало метода run(). Тогда она будет работать также, как любая другая задача, проверяя доступные готовые сокеты на каждое переключение между задачами. Метод ioPollTask вызовет ioPoll с нулевым таймаутом, значит stream_select вернет результат сразу, без ожидания.

Только если очередь задач пуста, мы используем null в качестве таймаута, в таком случае stream_select будет ожидать, пока какой-либо из сокетов не будет готов. Если бы мы этого не делали, то съели бы весь CPU (по крайней мере ядро), т.к. эта задача выполнялась бы в цикле раз за разом, пока кто-нибудь не подключился бы.

Сам сервер выглядит довольно просто:

function server($port) {
    echo "Starting server at port $port...\n";
    $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr);
    if (!$socket) throw new Exception($errStr, $errNo);
    stream_set_blocking($socket, 0);
    while (true) {
        yield waitForRead($socket);
        $clientSocket = stream_socket_accept($socket, 0);
        yield newTask(handleClient($clientSocket));
    }
}
function handleClient($socket) {
    yield waitForRead($socket);
    $data = fread($socket, 8192);
    $msg = "Received following request:\n\n$data";
    $msgLength = strlen($msg);
    $response = <<<RES
HTTP/1.1 200 OK\r
Content-Type: text/plain\r
Content-Length: $msgLength\r
Connection: close\r
\r
$msg
RES;
    yield waitForWrite($socket);
    fwrite($socket, $response);
    fclose($socket);
}
$scheduler = new Scheduler;
$scheduler->newTask(server(8000));
$scheduler->run();

Он будет принимать соединения на 8000 порт и просто отправлять в ответе содержимое самого запроса. Сделать что-либо «настоящее» было бы гораздо сложнее (правильная обработка HTTP-запросов — не тема этой статьи).

Теперь вы можете испытать этот сервер с помощью чего-то вроде ab -n 10000 -c 100 localhost:8000/. Так мы отправим 10000 запросов, 100 из которых будут слаться одновременно. Используя этот бенчмарк я получил средний ответ за 10 миллисекунд. Но была проблема с некоторыми запросами, которые обрабатывались очень долго (в райное 5 секунд), поэтому общая пропускная способность всего 2000 запросов в секунду. С более высокой конкуренцией (например -c 500) скрипт работает также неплохо, но некоторые соединения выбрасывают ошибку «Connections reset by peer».

Вынесенные сопрограммы

Если вы начнете делать большую систему, используя наш планировщик, вы вскоре наткнетесь на одну проблему: мы часто разделяли код, вынося какие-то куски в отдельные функции и вызывая их. Но с сопрограммами это невозможно. Представим такой код:

function echoTimes($msg, $max) {
    for ($i = 1; $i <= $max; ++$i) {
        echo "$msg iteration $i\n";
        yield;
    }
}
function task() {
    echoTimes('foo', 10); // print foo ten times
    echo "---\n";
    echoTimes('bar', 5); // print bar five times
    yield; // force it to be a coroutine
}
$scheduler = new Scheduler;
$scheduler->newTask(task());
$scheduler->run();

В этом коде мы попытались вынести сопрограмму из task()-а и выполнить ее. Но это не сработает. Как было сказано в самом начале статьи, вызов генератора не исполнит никакого кода в нем, вместо этого только вернет объект класса Generator. В нашем случае именно это и происходит, вызовы echoTimes()ничего не сделают, только возвратят объект.

Чтобы у нас это получилось, нам нужно написать небольшую обертку для сопрограмм. Таким образом так мы сможем вызывать под-сопрограммы:

$retval = (yield someCoroutine($foo, $bar));

Под-сопрограммы тоже могут возвращать значение с помощью yield:

yield retval("I'm return value!");

Функция retval() ничего не делает, кроме как возвращает обертку над значением, которая говорит нам о том, что это возвращаемое значение:

class CoroutineReturnValue {
    protected $value;
    public function __construct($value) {
        $this->value = $value;
    }
    public function getValue() {
        return $this->value;
    }
}
function retval($value) {
    return new CoroutineReturnValue($value);
}

Чтобы сделать из обычной сопрограммы составную (с под-сопрограммами) нам придется написать еще одну функцию (которая очевидно — еще одна сопрограмма):

function stackedCoroutine(Generator $gen) {
    $stack = new SplStack;
    for (;;) {
        $value = $gen->current();
        if ($value instanceof Generator) {
            $stack->push($gen);
            $gen = $value;
            continue;
        }
        $isReturnValue = $value instanceof CoroutineReturnValue;
        if (!$gen->valid() || $isReturnValue) {
            if ($stack->isEmpty()) {
                return;
            }
            $gen = $stack->pop();
            $gen->send($isReturnValue ? $value->getValue() : NULL);
            continue;
        }
        $gen->send(yield $gen->key() => $value);
    }
}

Эта функция работает как простая прокси между вызывающим и исполняющейся под-сопрограммой. Также, она проверяет является ли возвращаемое значение также генератором и, если да, запускает его. Когда она получает объект CoroutineReturnValue, она возьмет родительскую сопрограмму и продолжит ее выполнение.

Чтобы составные сопрограммы можно было использовать в задачах, строку $this->coroutine = $coroutine; в конструкторе класса Task, нужно заменить на $this->coroutine = stackedCoroutine($coroutine);.

Теперь мы можем улучшить наш веб-сервер немного, сгруппировав функции ожидания с функциями чтения, записи и приема нового соединения. Для этого воспользуемся таким классом:

class CoSocket {
    protected $socket;
    public function __construct($socket) {
        $this->socket = $socket;
    }
    public function accept() {
        yield waitForRead($this->socket);
        yield retval(new CoSocket(stream_socket_accept($this->socket, 0)));
    }
    public function read($size) {
        yield waitForRead($this->socket);
        yield retval(fread($this->socket, $size));
    }
    public function write($string) {
        yield waitForWrite($this->socket);
        fwrite($this->socket, $string);
    }
    public function close() {
        @fclose($this->socket);
    }
}

Теперь сервер можно немного переписать:

function server($port) {
    echo "Starting server at port $port...\n";
    $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr);
    if (!$socket) throw new Exception($errStr, $errNo);
    stream_set_blocking($socket, 0);
    $socket = new CoSocket($socket);
    while (true) {
        yield newTask(
            handleClient(yield $socket->accept())
        );
    }
}
function handleClient($socket) {
    $data = (yield $socket->read(8192));
    $msg = "Received following request:\n\n$data";
    $msgLength = strlen($msg);
    $response = <<<RES
HTTP/1.1 200 OK\r
Content-Type: text/plain\r
Content-Length: $msgLength\r
Connection: close\r
\r
$msg
RES;
    yield $socket->write($response);
    yield $socket->close();
}

 

Обработка ошибок

Как хороший программист, вы очевидно заметили, что вышеприведенным примерам кода не хватает обработки ошибок. Любая операция с сокетами всегда может вывалиться в ошибку. Я сделал так, потому что обработка ошибок для примеров очень утомительна (особенно в случае с сокетами) и это раздуло бы наш код в несколько раз.

Но, тем не менее, я бы хотел рассказать про обработку ошибок для сопрограмм в целом: сопрограммы позволяют выбрасывать исключения внутрь них используя метод throw().

Метод throw() берет исключение в первый аргумент и выбрасывает его в том месте, где стоит текущий yield (чье значение можно получить с помощью метода current()):

function gen() {
    echo "Foo\n";
    try {
        yield;
    } catch (Exception $e) {
        echo "Exception: {$e->getMessage()}\n";
    }
    echo "Bar\n";
}
$gen = gen();
$gen->rewind();                     // echos "Foo"
$gen->throw(new Exception('Test')); // echos "Exception: Test"
                                    // and "Bar"

Это очень классная вещь, т.к. мы можем, например, дать возможность через сисколы и под-сопрограммы выбрасывать исключения. Для сисколов, метод Scheduler::run() надо немного изменить:

if ($retval instanceof SystemCall) {
    try {
        $retval($task, $this);
    } catch (Exception $e) {
        $task->setException($e);
        $this->schedule($task);
    }
    continue;
}

И класс Task должен обрабатывать вызовы throw():

class Task {
    // ...
    protected $exception = null;
    public function setException($exception) {
        $this->exception = $exception;
    }
    public function run() {
        if ($this->beforeFirstYield) {
            $this->beforeFirstYield = false;
            return $this->coroutine->current();
        } elseif ($this->exception) {
            $retval = $this->coroutine->throw($this->exception);
            $this->exception = null;
            return $retval;
        } else {
            $retval = $this->coroutine->send($this->sendValue);
            $this->sendValue = null;
            return $retval;
        }
    }
    // ...
}

Теперь мы можем начать выбрасывать исключения из сисколов! Например для killTask, давайте выбросим исключение, если переданный идентификатор задачи невалиден:

function killTask($tid) {
    return new SystemCall(
        function(Task $task, Scheduler $scheduler) use ($tid) {
            if ($scheduler->killTask($tid)) {
                $scheduler->schedule($task);
            } else {
                throw new InvalidArgumentException('Invalid task ID!');
            }
        }
    );
}

Теперь попробуем:

function task() {
    try {
        yield killTask(500);
    } catch (Exception $e) {
        echo 'Tried to kill task 500 but failed: ', $e->getMessage(), "\n";
    }
}

Пока что это не будет работать, т.к. функция stackedCoroutine не обрабатывает исключения. Чтобы поправить это, модифицируем ее немного:

function stackedCoroutine(Generator $gen) {
    $stack = new SplStack;
    $exception = null;
    for (;;) {
        try {
            if ($exception) {
                $gen->throw($exception);
                $exception = null;
                continue;
            }
            $value = $gen->current();
            if ($value instanceof Generator) {
                $stack->push($gen);
                $gen = $value;
                continue;
            }
            $isReturnValue = $value instanceof CoroutineReturnValue;
            if (!$gen->valid() || $isReturnValue) {
                if ($stack->isEmpty()) {
                    return;
                }
                $gen = $stack->pop();
                $gen->send($isReturnValue ? $value->getValue() : NULL);
                continue;
            }
            try {
                $sendValue = (yield $gen->key() => $value);
            } catch (Exception $e) {
                $gen->throw($e);
                continue;
            }
            $gen->send($sendValue);
        } catch (Exception $e) {
            if ($stack->isEmpty()) {
                throw $e;
            }
            $gen = $stack->pop();
            $exception = $e;
        }
    }
}

 

Подводя итог

В этой статье мы построили планировщик задач, используя совместную мультизадачность, возможность слать сисколы, поработали с неблокирующими операциями и обработкой ошибок. Самая классная вещь во всем этом, это то, что конечный код выглядит абсолютно синхронным, даже не смотря на то, что он делает много асинхронных операций. Если вам нужно прочитать данные из сокета, вам не надо передавать коллбэк или регистрировать листенер. Вместо этого вы пишите yield $socket->read().

Когда я впервые услышал об этом, я понял, какая это крутая вещь, это и мотивировало меня имплементировать их в PHP. В то же время я нахожу сопрограммы действительно пугающими. Есть тонкая грань между классным кодом и жуткой грязью, и я думаю, сопрограммы стоят на этой грани. Мне сложно сказать, является ли использование сопрограмм в таком ключе, в каком я описал выше, каким-то выигрышным.

В любом случае, я думаю это очень интересная тема и надеюсь вы тоже ?

ПС от переводчика: статья очень большая и сложная для восприятия, поэтому могут быть ошибки, в т.ч. смысловые. если увидели такую — отправляйте пожалуйста.