Мониторинг и администрирование серверов
framework.zend.com
Stable релиз 2.0 / 1.12

Ruby & PHP. Starling и Zend_Queue.

К комментариям

В этой статье я расскажу о животрепещущем для многих вопросе. Как соединить между собой приложения на разных языках. Например, Ruby и PHP. В Twitter проблему интеграции с очередью решили с помощью Starling. Вообще сейчас намечается тенденция, что для каждой задачи подбирают свой язык. Гомогенных систем становится всё меньше. В следствии этого возникает потребность в стандартах на интеграцию разношерстного ПО в единую систему.


 

  1. Постановка задачи
  2. Что такое очереди сообщений?
  3. Что такое Starling?
  4. Ruby + Starling
  5. PHP + Starling (ZendExtra_Queue_Adapter_Starling)
  6. Преимущества и недостатки Starling
  7. Литература

1. Постановка задачи

Допустим у нас есть ситуация, при которой во время какого-либо HTTP запроса необходимо сделать длительную операцию. Например, при добавлении пользователем комментария к статье отправить email уведомление её автору. В классической схеме взаимодействия (рис 7-1) мы вынуждены ждать завершения этой операции, чтобы уведомить пользователя. Получается что эта операция блокирует выполнение веб-приложения, а значит происходит потеря производительности. Да и пользователю приходится ждать лишнее время. Когда количество пользователей невелико - это не проблема, но в Highload проектах этот вопрос приобретает большое значение. Мы определились с задачей - вынести все ресурсоемкие задачи в фоновые процессы. Рассмотрим случай с хостингом и добавлением домена в веб-панели. Задачей будет выполнение скрипта на Ruby, который будет производить манипуляции с DNS зонами и перезагружать DNS сервис.

Теперь определимся с схемой работы. Клиентом очереди будет ZendFramework приложение, которое будет добавлять задачи в очередь (отправлять сообщения в очередь). Сервером (воркером) будет Ruby, который будет принимать сообщения из очереди и обрабатывать их.

Почему выбрана именно такая комбинация? ZF для клиента исторически сложился, а вот сервер на Ruby выбран, т.к. очень не хочется давать PHP права root, думаю многие админы меня поймут. Выбор очереди был сделан в пользу Starling, т.к. она основана на Memcached и использует его протокол (ну почти :-) ) и реализация на руби предельно проста.

2. Что такое очереди сообщений

"А как вы организуете очередь?
- С помощью бабушек!"

с Хабрахабра

Тема очередей сообщений была навеяна докладом "Разделение труда: Организация многозадачной, распределенной системы в Zend Framework с помощью Job Queue" Александра Готгельфа на последнем ZFCONF. Он работал с сервером очередей Gearman, и рассказал о его немногочисленных, но очень серьезных багах (таких, как проблема освобождения памяти). В связи с этим я начал смотреть в торону других и остановился на MemcacheQ и Starling, который очень советовали пользователи хабра. А т.к. имплементация на руби проще со вторым он и был выбран.

3. Что такое Starling?

Starling - это скворец, певчая птица семейства скворцовых, широко распространённая на значительной территории Евразии, а также успешно интродуцированная в Южную Африку, Северную Америку, Австралию и Новую Зеландию. На юге и западе Европы ведёт оседлый образ жизни, а в северной и восточной её части является перелётной, в зимние месяцы мигрируя на юг. Внешне (размерами, желтым клювом и темным оперением) слегка напоминает чёрных дроздов, но в отличие от них ходит по земле, а не прыгает. :-)

А ещё так называется очередь сообщений, написанная на Ruby. Её используют в Twitter! Для установки сервера очередей у вас уже должен быть установлен Ruby. Сам же Starling ставится очень просто. Следующая строка выполняется только один раз!

gem sources -a http://gems.github.com/ Ставим гем (можно делать сколько угодно раз). sudo gem install starling

Если вы ставили руби для всех юзеров, то выполняем вместо sudo rvmsudo Дальше надо запустить сервер и можно коннектить серверов и клиентов.

andrey@comp$ starling
Starting at 127.0.0.1:22122.
I, [2011-05-20T10:53:34.346297 #6017] INFO -- : Starling STARTUP on 127.0.0.1:22122

Теперь можно подключать клиентов на руби и php. Клиентами будем называть тех, кто отправляет задания в очередь, а серверами - тех кто получает и обслуживает их (to serve). Теперь можно подключать клиентов на руби и php.

Общий формат данных. Ruby маршализация в PHP

Первое с чем я столкнулся это формат хранения сообщений в очереди. На выбор было несколько вариантов.

  • [б] Маршализация (marshalling) объектов. Так делает руби и это дефолтовый способ работы Starling,
  • [т] Сериализация (serializing). Так хранит объекты Zend Framework, а именно Zend_Queue,
  • [т] JSON. Стандарт де-факто для взаимодействия в гетерогенных средах,
  • [т] XML. Стандарт де-юро, однако имеет больший расход памяти для хранения сообщений, т.к. имеет теговую природу,

где [т] - текстовый формат, [б] -бинарный формат.

 

Скажу так, в начале у меня был соблазн вообще не трогать Starling, а научиться в PHP работать с маршализованными руби объектами. Но ничего похожего я найти не смог. Использовать XML не хотелось, для работы с JSON надо было бы здорово переписывать сервер очередей. Вот тогда то мне на глаза и попался гем php_serialize.

4. Ruby + Starling

Делает он ровно то, что я от него и ожидал, а именно выполняет serialize() функцию над Ruby объектами и наоборот. Итак схема такова: php -> serialize -> starling -> unserialize -> ruby. Для этого правда всё равно пришлось коё-что переписать в Starling. Приведу здесь код простейшего воркера очереди.

require 'starling'
require 'php_serialize'

class Starling
def get_raw(key, raw = false)
with_server(key) do |server, cache_key|
logger.debug { "get #{key} from #{server.inspect}" } if logger
value = cache_get server, cache_key
return nil if value.nil?
value = PHP.unserialize value unless raw
return value
end
rescue TypeError => err
handle_error nil, err
end
end
# --------------------------------------------------
starling = Starling.new('0.0.0.0:22122')

puts starling.sizeof('myqueue1')
puts starling.get_raw('myqueue1')

exit

Как видите, вместо Marshal.load из MemCache::get и использую PHP::unserialize и пропустив один уровень наследования (MemCache) для того, чтобы не поломать обычный MemCache клиент я внедрил его в класс Starling. Отлчино, теперь примемся за вторую часть нашей системы.

5. PHP + Starling (ZendExtra_Queue_Adapter_Starling)

В Zend Framework уже есть неплохой класс работы с очередями - Zend_Queue, но к сожалению он не поддерживает Starling. Для внедрения поддержки создадим наш адаптер, который отнаследуем от ближайшего родственника (Zend_Queue_Adapter_MemcacheQ).

В Зенд-Кью (а именно так читается Zend_Queue) очень хорошо реализована работа с адаптерами хранилищ. Каждый из них очень разный, и паттерн "Стратегия" тут бы не подошёл, однако авторы нашли выход и добавили функцию getCapabilities(), которая возвращает массив возможностей хранилища.

/**
* Supporting abilities
*
* @return array
*/
public function getCapabilities()
{
return array(
'create' => true,
'delete' => true,
'send' => true,
'receive' => true,
'deleteMessage' => false,
'getQueues' => true,
'count' => true,
'isExists' => true,
);
}

Для нашего хранилища есть возможность получить количество сообщений в очереди на обслуживание, но его надо реализовать. В написал/переписал/дописал часть функций адаптера в результате родился ZendExtra_Queue_Adapter_Starling

<?php

/**
* Adapter for storage messags in Starling queue
* Note: messages are stored serialized
* After recieving outside this adapter unserialization is needed
* For Ruby you can use gem "php_serialize" (not "php-serialize")
*/
require_once 'Zend/Queue/Adapter/AdapterAbstract.php';

class ZendExtra_Queue_Adapter_Starling extends Zend_Queue_Adapter_Memcacheq {
const DEFAULT_HOST = '127.0.0.1';
const DEFAULT_PORT = 22122;
const EOL = "\r\n";

/**
* Supporting abilities
*
* @return array
*/
public function getCapabilities() {
return array(
'create' => true,
'delete' => true,
'send' => true,
'receive' => true,
'deleteMessage' => false,
'getQueues' => true,
'count' => true,
'isExists' => true,
);
}

public function __construct($options, Zend_Queue $queue = null) {
if (!extension_loaded('memcache')) {
require_once 'Zend/Queue/Exception.php';
throw new Zend_Queue_Exception('Memcache extension does not appear to be loaded');
}

Zend_Queue_Adapter_AdapterAbstract::__construct($options, $queue); // instead of parent::, which is MemcachedQ

$options = &$this->_options['driverOptions'];

if (!array_key_exists('host', $options)) {
$options['host'] = self::DEFAULT_HOST;
}
if (!array_key_exists('port', $options)) {
$options['port'] = self::DEFAULT_PORT;
}

$this->_cache = new Memcache();

$result = $this->_cache->connect($options['host'], $options['port']);

if ($result === false) {
require_once 'Zend/Queue/Exception.php';
throw new Zend_Queue_Exception('Could not connect to MemcacheQ');
}

$this->_host = $options['host'];
$this->_port = (int) $options['port'];
}

/**
* Return queues list
*
* @return array
*/
public function getQueues() {
$this->_queues = array();

$response = $this->_sendCommand('stats', array('END'));

$postfixesArray = array('_items', '_total_items', '_logsize', '_expired_items', '_age', '_total', '_expired');
$arr = array();
foreach ($response as $i => $line) {
if (strpos($line, 'STAT queue_') === 0) { // Zero position (start), not false
$tmp = str_replace('STAT queue_', '', $line);
$tmp = explode(' ', $tmp);
$tmp = $tmp[0];
foreach ($postfixesArray as $postfix) {
$tmp = str_replace($postfix, '', $tmp);
}
$arr[] = $tmp;
}
}
$this->_queues = array_unique($arr);
return $this->_queues;
}

/**
* Return the approximate number of messages in the queue
*
* @return integer
* @throws Zend_Queue_Exception (not supported)
*/
public function count(Zend_Queue $queue = null) {
if ($queue === NULL) {
$keyName = 'queue_' . $this->getQueue()->getName() . '_items';
} else {
$keyName = 'queue_' . $queue->getName() . '_items';
}
$response = $this->_sendCommand('stats', array('END'));
foreach ($response as $line) {
if (strpos($line, $keyName) !== false) {
$tmp = str_replace('STAT ' . $keyName . ' ', '', $line);
return (int) $tmp;
}
}
return 0;
}

/**
* Get messages in the queue
*
* @param integer $maxMessages Maximum number of messages to return
* @param integer $timeout Visibility timeout for these messages
* @param Zend_Queue $queue
* @return Zend_Queue_Message_Iterator
* @throws Zend_Queue_Exception
*/
public function receive($maxMessages=null, $timeout=null, Zend_Queue $queue=null) {
if ($maxMessages === null) {
$maxMessages = 1;
}

if ($timeout === null) {
$timeout = self::RECEIVE_TIMEOUT_DEFAULT;
}
if ($queue === null) {
$queue = $this->_queue;
}

$msgs = array();

// Setting up the limit upon to queue count
if ($maxMessages != NULL) {
$count = $this->count($queue);
if ($count < $maxMessages)
$maxMessages = $count;
}

if ($maxMessages > 0) {
for ($i = 0; $i < $maxMessages; $i++) {
$data = unserialize($this->_cache->get($queue->getName()));
$msgs[] = $data;
}
}

$options = array(
'queue' => $queue,
'data' => $msgs,
'messageClass' => $queue->getMessageClass(),
);
$classname = $queue->getMessageSetClass();
if (!class_exists($classname)) {
require_once 'Zend/Loader.php';
Zend_Loader::loadClass($classname);
}
return new $classname($options);
}

/**
* Delete a queue and all of it's messages
*
* Returns false if the queue is not found, true if the queue exists
*
* @param string $name queue name
* @return boolean
* @throws Zend_Queue_Exception
*/
public function delete($name) {
$queue = $this->getQueue($name);
$count = $this->count($queue);
$this->receive($count, NULL, $queue);
}

/**
* Send a message to the queue
*
* @param string $message Message to send to the active queue
* @param Zend_Queue $queue
* @return Zend_Queue_Message
* @throws Zend_Queue_Exception
*/
public function send($message, Zend_Queue $queue=null) {
if ($queue === null) {
$queue = $this->_queue;
}

if (!$this->isExists($queue->getName())) {
require_once 'Zend/Queue/Exception.php';
throw new Zend_Queue_Exception('Queue does not exist:' . $queue->getName());
}

$message = serialize($message);
$data = array(
'message_id' => md5(uniqid(rand(), true)),
'handle' => null,
'body' => $message,
'md5' => md5($message),
);

$result = $this->_cache->set($queue->getName(), $message, 0, 0);
if ($result === false) {
require_once 'Zend/Queue/Exception.php';
throw new Zend_Queue_Exception('failed to insert message into queue:' . $queue->getName());
}

$options = array(
'queue' => $queue,
'data' => $data,
);

$classname = $queue->getMessageClass();
if (!class_exists($classname)) {
require_once 'Zend/Loader.php';
Zend_Loader::loadClass($classname);
}
return new $classname($options);
}

}

А вот и пример инициализации класса и работы с ним: StarlingZF.php

$queuePool = new Zend_Queue(new ZendExtra_Queue_Adapter_Starling(array()), array('name' => 'myqueue1'));
$queuePool->send(array('domain' => 'site.ru'));
var_dump($queuePool->receive()->toArray()); // Messages also deleting from queue

Или тоже самое но на уровне PHP: StarlingPHP.php

connect('localhost', 22122) or die ("Could not connect");
$data = array('domain' => 'test2.ru');
$memcache->set('myqueue1', serialize($data)) or die ("Failed to save data at the server");
print 'ok';

Как видите всё не так сложно. Теперь можно даже написать утилиту мониторинга (если вам не подходят существующие), которая будет следить за очередью.

6. Преимущества и недостатки Starling

Давайте посмотрим, что у нас получилось, и что нам это даёт.

  • Скорость. Данные хранятся в Memcache, который славится своей производительностью.
  • Масштабируемость. Очередь может быть распределена на несколько серверов.
  • Стандартизированность и унификация. Используется стандартный Memcached-протокол.

Но есть и недостатки.

  • Надежность. Т.к. все данные хранятся в памяти, то при перезапуске сервера задачи - теряются.

Но в некоторых случаях это может быть не критично. Например в случае хостера отметка о завершении задачи ставится в конце неё, и при необходимости клиентская часть сможет повторить запрос. В случае отправки email -есть внешняя очередь Postfix, куда ставятся сообщения на отправку, а Starling является буферной очередью для почтового сервера. В случае ресайза картинок, задача на ресайз может быть поставлена при повторном обращении ккартинке по проверки условия выполнения задачи. Вот, что пишут про неё сами разработчики.

Система надёжна, быстра и использует стандартный протокол memcached.

По заверениям разработчиков, это вообще самое стабильное звено твиттера. Когда другие элементы системы отключаются, Starling всегда продолжает работать. Так что не всё так плохо, важно понимать, что идеальных решений не существует и любое из них компромисс в ту или иную сторону и определитсья с приоритетами. Если приоритет - Highload, то, думаю, сквоец - отлчиное решение!

Автор: Токарчук Андрей

Тема на форуме
 

7. Литература

Пост про Straling в моём блоге
Ruby PHP Serializer
The complete guide to setting up Starling
Монитор очереди сообщений
Исходный код starling
Статья Ruby Asynchronous Messaging из книги Distributed Programming With Ruby'
Краткий обзор MQ (Messages queue) для применения в проектах на РНР. Часть 1, Часть 2
ZF Proposal- Zend_Queue_Adapter_Starling

метки: Starling, Zend_Queue
Лучший способ следить за обновлениями сайта это подписаться на RSS
Если информация была полезной для вас, вы можете поддержать сайт.
Комментарии:
Уважаемые пользователи. Комментарии не для того чтобы:
  1. Спрашивать почему у вас не работает код, для этого есть тема форума закрепленная за статьей.
  2. Спрашивать как реализовать ту или иную функциональность, для этого необходимо создать свою тему на форуме.

Комментарии для того чтобы: высказать свое аргументированное мнение о статье, указать какие участки вызывают непонимание, что нужно исправить/улучшить, просто сказать спасибо.

Комментарии имеют древовидную структуру.
Если вы хотите ответить на определенный комментарий - нажмите на ссылку "Ответить" возле этого комментария.

Комментарии не соответствующие этим правилам могут быть удалены. Спасибо за понимание.
Комментарии временно отключены, вы можете воспользоваться форумом.