Автор Тема: \Zend\Queue\Adapter\Starling  (Прочитано 2139 раз)

0 Пользователей и 1 Гость смотрят эту тему.

Оффлайн zizop

  • Опытный
  • ***
  • Сообщений: 70
  • Карма: 10
    • Zend Framework (PHP) программист
\Zend\Queue\Adapter\Starling
« : Сентябрь 20, 2011, 23:16:02 »
Тема очередей сообщений была навеяна докладом «Разделение труда: Организация многозадачной, распределенной системы в Zend Framework с помощью Job Queue»  Александра Готгельфа на последнем ZFCONF. Он работал с сервером очередей Gearman, и рассказал о его немногочисленных, но очень серьезных багах (таких, как проблема освобождения памяти). В связи с этим я начал смотреть в торону других и остановился на MemcacheQ и Starling, который очень советовали пользователи хабра. А т.к. имплементация на руби проще со вторым он и был выбран.

Starling - это очередь сообщений, которую в частности используют в Twitter. Это не значит, что надо всё бросить и начать её юзать, однако следует знать, что ресурсов она особо не кушает и дело своё выполняет хорошо, что проверено собственным опытом. Данный класс, это собственно не предложение, а уже готовое решение. Однако в ZF его пока ещё не включили, и там он значится как Proposal Zend_Queue_Adapter_Starling. Там же можно почитать мануал и примеры, а ниже приведу его исходничег, благо он не такой большой.
Описание танцев с бубном, которые привели к созданию этого класса, можно найти тут.


<?php
 
/**
 * Adapter for storage messags in Starling queue
 * @author Tokarchuk Andrey (netandreus@gmail.com)
 * @link http://tokarchuk.ru/2011/05/php-ruby-with-starling/
 * Note: messages are stored serialized
 * After recieving outside this adapter unserialization is needed
 * For Ruby you can use gem "php_serialize" (not "php-serialize")
 *
 * Usage:
 *
 * $queuePool = new Zend_Queue(new ZendExtra_Queue_Adapter_Starling(array()), array('name' => 'myqueue1'));
 * $queuePool->send(array('domain' => 'site.ru'));
 * var_dump($queuePool->receive()->toArray()); // Messages also deleting from queue
 *
 */
require_once 'Zend/Queue/Adapter/AdapterAbstract.php';
class 
ZendExtra_Queue_Adapter_Starling extends Zend_Queue_Adapter_Memcacheq
{
    const 
DEFAULT_HOST '127.0.0.1';
    const 
DEFAULT_PORT 22122;
    const 
EOL          "\r\n";
 
    
/**
     * Supporting abilities
     *
     * @return array
     */
    
public function getCapabilities()
    {
        return array(
            
'create'        => true,
            
'delete'        => true,
            
'send'          => true,
            
'receive'       => true,
            
'deleteMessage' => false,
            
'getQueues'     => true,
            
'count'         => true,
            
'isExists'      => true,
        );
    }
 
    public function 
__construct($optionsZend_Queue $queue null)
    {
        if (!
extension_loaded('memcache')) {
            require_once 
'Zend/Queue/Exception.php';
            throw new 
Zend_Queue_Exception('Memcache extension does not appear to be loaded');
        }
 
        
Zend_Queue_Adapter_AdapterAbstract::__construct($options$queue); // instead of parent::, which is MemcachedQ
 
        
$options = &$this->_options['driverOptions'];
 
        if (!
array_key_exists('host'$options)) {
            
$options['host'] = self::DEFAULT_HOST;
        }
        if (!
array_key_exists('port'$options)) {
            
$options['port'] = self::DEFAULT_PORT;
        }
 
        
$this->_cache = new Memcache();
 
        
$result $this->_cache->connect($options['host'], $options['port']);
 
        if (
$result === false) {
            require_once 
'Zend/Queue/Exception.php';
            throw new 
Zend_Queue_Exception('Could not connect to MemcacheQ');
        }
 
        
$this->_host $options['host'];
        
$this->_port = (int)$options['port'];
    }
 
    
/**
     * Return queues list
     *
     * @return array
     */
    
public function getQueues()
    {
        
$this->_queues = array();
 
        
$response $this->_sendCommand('stats', array('END'));
 
        
$postfixesArray = array('_items''_total_items''_logsize''_expired_items''_age''_total''_expired');
        
$arr = array();
        foreach (
$response as $i => $line) {
            if(
strpos($line'STAT queue_') === ) { // Zero position (start), not false
                
$tmp str_replace('STAT queue_'''$line);
                
$tmp explode(' '$tmp);
                
$tmp $tmp[0];
                foreach(
$postfixesArray as $postfix) {
                    
$tmp str_replace($postfix''$tmp);
                }
                
$arr[] = $tmp;
            }
        }
        
$this->_queues array_unique($arr);
        return 
$this->_queues;
    }
 
    
/**
     * Return the approximate number of messages in the queue
     *
     * @return integer
     * @throws Zend_Queue_Exception (not supported)
     */
    
public function count(Zend_Queue $queue null)
    {
        if(
$queue === NULL) {
            
$keyName 'queue_'.$this->getQueue()->getName().'_items';
        } else {
            
$keyName 'queue_'.$queue->getName().'_items';
        }
        
$response $this->_sendCommand('stats', array('END'));
        foreach(
$response as $line) {
            if(
strpos($line$keyName) !== false) {
                
$tmp str_replace('STAT '.$keyName.' '''$line);
                return (int) 
$tmp;
            }
        }
        return 
0;
    }
 
    
/**
     * Get messages in the queue
     *
     * @param  integer    $maxMessages  Maximum number of messages to return
     * @param  integer    $timeout      Visibility timeout for these messages
     * @param  Zend_Queue $queue
     * @return Zend_Queue_Message_Iterator
     * @throws Zend_Queue_Exception
     */
    
public function receive($maxMessages=null$timeout=nullZend_Queue $queue=null)
    {
        if (
$maxMessages === null) {
            
$maxMessages 1;
        }
 
        if (
$timeout === null) {
            
$timeout self::RECEIVE_TIMEOUT_DEFAULT;
        }
        if (
$queue === null) {
            
$queue $this->_queue;
        }
 
        
$msgs = array();
 
        
// Setting up the limit upon to queue count
        
if($maxMessages != NULL) {
            
$count $this->count($queue);
            if(
$count $maxMessages)
                
$maxMessages $count;
        }
 
        if (
$maxMessages ) {
            for (
$i 0$i $maxMessages$i++) {
                
$data unserialize($this->_cache->get($queue->getName()));
                
$msgs[] = $data;
            }
        }
 
        
$options = array(
            
'queue'        => $queue,
            
'data'         => $msgs,
            
'messageClass' => $queue->getMessageClass(),
        );
        
$classname $queue->getMessageSetClass();
        if (!
class_exists($classname)) {
            require_once 
'Zend/Loader.php';
            
Zend_Loader::loadClass($classname);
        }
        return new 
$classname($options);
    }
 
    
/**
     * Delete a queue and all of it's messages
     *
     * Returns false if the queue is not found, true if the queue exists
     *
     * @param  string  $name queue name
     * @return boolean
     * @throws Zend_Queue_Exception
     */
    
public function delete($name)
    {
        
$queue $this->getQueue($name);
        
$count $this->count($queue);
        
$this->receive($countNULL$queue);
    }
 
     
/**
     * Send a message to the queue
     *
     * @param  string     $message Message to send to the active queue
     * @param  Zend_Queue $queue
     * @return Zend_Queue_Message
     * @throws Zend_Queue_Exception
     */
    
public function send($messageZend_Queue $queue=null)
    {
        if (
$queue === null) {
            
$queue $this->_queue;
        }
 
        if (!
$this->isExists($queue->getName())) {
            require_once 
'Zend/Queue/Exception.php';
            throw new 
Zend_Queue_Exception('Queue does not exist:' $queue->getName());
        }
 
        
$message serialize($message);
        
$data    = array(
            
'message_id' => md5(uniqid(rand(), true)),
            
'handle'     => null,
            
'body'       => $message,
            
'md5'        => md5($message),
        );
 
        
$result $this->_cache->set($queue->getName(), $message00);
        if (
$result === false) {
            require_once 
'Zend/Queue/Exception.php';
            throw new 
Zend_Queue_Exception('failed to insert message into queue:' $queue->getName());
        }
 
        
$options = array(
            
'queue' => $queue,
            
'data'  => $data,
        );
 
        
$classname $queue->getMessageClass();
        if (!
class_exists($classname)) {
            require_once 
'Zend/Loader.php';
            
Zend_Loader::loadClass($classname);
        }
        return new 
$classname($options);
    }
 
 
}