PHP Classes
Icontem

File: include/class.MTDaemon.php


  Search   All class groups All class groups   Latest entries Latest entries   Top 10 charts Top 10 charts   Newsletter Newsletter   Blog Blog   Forums Forums   Help FAQ Help FAQ  
  Login   Register  
Recommend this page to a friend! ReTweet ReTweet Stumble It! Stumble It! Bookmark in del.icio.us Bookmark in del.icio.us
  Classes of Killerwhile  >  Multi Threaded Daemon  >  include/class.MTDaemon.php  
File: include/class.MTDaemon.php
Role: Class source
Content type: text/plain
Description: Main core class
Class: Multi Threaded Daemon
Manage the execution of parallel threads
 

Contents

Class file image Download
<?php

/**
 * MultiThreaded Daemon (MTD)
 * 
 * Copyright (c) 2007, Benoit Perroud
 * 
 * All rights reserved.
 * 
 * Redistribution and use in source and binary forms, with or
 * without modification, are permitted provided that the following
 * conditions are met: Redistributions of source code must retain the
 * above copyright notice, this list of conditions and the following
 * disclaimer. Redistributions in binary form must reproduce the above
 * copyright notice, this list of conditions and the following disclaimer
 * in the documentation and/or other materials provided with the
 * distribution.
 * 
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 * @package     MTD
 * @author      Benoit Perroud <ben@migtechnology.ch>
 * @copyright   2007 Benoit Perroud
 * @license     http://www.gnu.org/copyleft/gpl.html
 * @version     $Id: class.MTDaemon.php 8 2007-10-19 09:16:00Z killerwhile $
 *
 * See http://code.google.com/p/phpmultithreadeddaemon/ 
 * and http://phpmultithreaddaemon.blogspot.com/ for more information
 *
 */

require_once dirname(__FILE__) . '/class.MTLog.php';

abstract class MTDaemon 
{

    /*
     * Configuration vaiables
     */

    // max concurrent threads
    protected $max_threads = 4; // should be implemented with sem_get('name', $max_aquire) but this can't be dynamically updated as this var.

    // sleep time when no job
    protected $idle_sleep_time = 5; 

    /*
     * Internal constants
     */
    const _INDEX_DATA = 0;    // shared data goes here.
    const _INDEX_THREADS = 1; // how many threads are working
    const _INDEX_SLOTS = 2;   // which slot do the current thread use ?

    /*
     * Internal variables
     */
    protected $shm;                         // = ftok(__FILE__, 'g');
    protected $shared_data;                 // = shm_attach($this->shm);

    protected $mutex;                       // lock critical path, used in lock() and unlock()
    protected $mutex_main_process;          // lock main process only. Children can continue to run
    protected $mutex_children_processes;    // lock children processes only. Main can continue to run

    protected $main_thread_pid;

    /*
     * Constructor
     * 
     * @params $threads : number of concurrent threads, default 4
     * @params $idelsleeptime : time to sleep when no job ready (getNext return null), in seconds, default 5 
     */
    public function __construct($threads = null, $idlesleeptime = null)
    {
        if ($threads) $this->max_threads = $threads;
        if ($idlesleeptime) $this->idle_sleep_time = $idlesleeptime;
        $this->main_thread_pid = posix_getpid();
    }

    /*
     * Hook called just before the main loop.
     * 
     * Remark : cleanup code goes here.
     */
    protected function _prerun()
    {
        MTLog::getInstance()->info('Starting daemon with ' . $this->max_threads . ' slots');

        $this->shm                      = ftok(__FILE__, 'g'); // global shm
        $this->shared_data              = shm_attach($this->shm);

        $this->mutex                    = sem_get($this->shm);
        $this->mutex_main_process       = sem_get(ftok(__FILE__, 'm'));
        $this->mutex_children_processes = sem_get(ftok(__FILE__, 'c'));

        shm_put_var($this->shared_data, self::_INDEX_DATA, array());

        $this->setThreads(0);

        $slots = array();
        for ($i = 0; $i < $this->max_threads; $i++) {
            $slots[] = false;
        }
        shm_put_var($this->shared_data, self::_INDEX_SLOTS, $slots);
    }

    /*
     * Hook called just after the main loop
     */
    protected function _postrun()
    {
        MTLog::getInstance()->info('Stopping daemon. ');

        shm_remove($this->shared_data);
        sem_remove($this->mutex);
        sem_remove($this->mutex_process);
    }

    /*
     * Main loop, request next job using getNext() and execute run($job) in a separate thread
     * _prerun and _postrun hooks are called before and after the main loop -> usefull for cleanup and so on.
     */
    public function handle()
    {

        $this->run = true;

        $this->_prerun();

        while ($this->run) {

            /* 
             * Terminating all child, to not let some zombie leaking the memory.
             */

            MTLog::getInstance()->debug2('-- Next iteration ');

            $this->lock();

            // HACK : avoid zombie and free earlier the memory
            do {
                $res = pcntl_wait($status, WNOHANG);
                MTLog::getInstance()->debug2('$res = pcntl_wait($status, WNOHANG); called with $res = ' . $res);
                if ($res > 0) MTLog::getInstance()->debug('(finishing child with pid ' . $res . ')');
            } while ($res > 0);

            /*
             * Loop until a slot frees 
             */
            while (!$this->hasFreeSlot()) {
                $this->unlock();
                MTLog::getInstance()->debug('No more free slot, waiting');
                $res = pcntl_wait($status); // wait until a child ends up
                MTLog::getInstance()->debug2('$res = pcntl_wait($status); called with $res = ' . $res);
                if ($res > 0) {
                    MTLog::getInstance()->debug('Finishing child with pid ' . $res);
                } else {
                    MTLog::getInstance()->error('Outch1, this souldn\'t happen. Verify your implementation ...');
                    $this->run = false;
                    continue;
                }
                $this->lock();
            }

            $slot = $this->requestSlot();
            $this->incThreads();

            $this->unlock();

            if ($slot === null) {
                var_dump(shm_get_var($this->shared_data, self::_INDEX_DATA));
                var_dump(shm_get_var($this->shared_data, self::_INDEX_THREADS));
                var_dump(shm_get_var($this->shared_data, self::_INDEX_SLOTS));

                MTLog::getInstance()->error('Outch2, this souldn\'t happen. Verify your implementation ...');
                $this->run = false;
                continue;
            }
            
            /*
             * Request next action to handle
             */
            $next = $this->getNext($slot);
            
            /*
             * If no job
             */
            if (!$next) {

                MTLog::getInstance()->debug('No job, sleeping at most ' . $this->idle_sleep_time . ' sec ... ');

// TODO : waiting for signal pushed into a queue when inserting a new job.

                $this->lock();
                $this->releaseSlot($slot);
                $this->decThreads();
                $this->unlock();

                sleep($this->idle_sleep_time);

                continue;
                
            } else { 

                $pid = pcntl_fork();

                if ($pid == -1) {
                        MTLog::getInstance()->error('[fork] Duplication impossible');
                        $this->run = false;
                        continue;
                } else if ($pid) {

                        usleep(10); // HACK : give the hand to the child -> a simple way to better handle zombies

                        continue;
                } else {

                    MTLog::getInstance()->debug('Executing thread #' . posix_getpid() . ' in slot ' . number_format($slot));

                    $res = $this->run($next, $slot);

                    $this->lock();
                    $this->releaseSlot($slot);
                    $this->decThreads();
                    $this->unlock();

                    exit($res);

                }
            }
        }

        $this->_postrun();

    }

    /*
     * Request data of the next element to run in a thread
     * 
     * return null or false if no job currently
     */
    abstract public function getNext($slot);

    /*
     * Process the element fetched by getNext in a new thread
     * 
     * return the exiting status of the thread
     */
    abstract public function run($next, $slot);

    /*
     *
     */
    protected function lock()
    {
        MTLog::getInstance()->debug2('[lock] lock');
        $res = sem_acquire($this->mutex);
        if (!$res) exit(-1);
    }

    /*
     *
     */
    protected function unlock()
    {
        MTLog::getInstance()->debug2('[lock] unlock');
        $res = sem_release($this->mutex);
        if (!$res) exit(-1);
    }

    /*
     *
     */
    protected function lockMain()
    {
        MTLog::getInstance()->debug2('[lock] lock main process');
        $res = sem_acquire($this->mutex_main_process);
        if (!$res) exit(-1);
    }

    /*
     *
     */
    protected function unlockMain()
    {
        MTLog::getInstance()->debug2('[lock] unlock main process');
        $res = sem_release($this->mutex_main_process);
        if (!$res) exit(-1);
    }

    /*
     *
     */
    protected function lockChildren()
    {
        MTLog::getInstance()->debug2('[lock] lock children processes');
        $res = sem_acquire($this->mutex_children_processes);
        if (!$res) exit(-1);
    }

    /*
     *
     */
    protected function unlockChildren()
    {
        MTLog::getInstance()->debug2('[lock] unlock children processes');
        $res = sem_release($this->mutex_children_processes);
        if (!$res) exit(-1);
    }

    /*
     * Get a shared var based on hash.
     *
     * Return null if the var doesn't exist.
     */
    protected function getVar($name, $lock = false)
    {
        if ($lock) $this->lock();
        $vars = shm_get_var($this->shared_data, self::_INDEX_DATA);
        $value = (isset($vars[$name])) ? $vars[$name] : null;
        if ($lock) $this->unlock();
        return $value;
    }

    /*
     * Set a shared var.
     * 
     * Remark : the var should be serialized.
     */
    protected function setVar($name, $value, $lock = false)
    {
        if ($lock) $this->lock();
        $vars = shm_get_var($this->shared_data, self::_INDEX_DATA);
        $vars[$name] = $value;
        $res = shm_put_var($this->shared_data, self::_INDEX_DATA, $vars);
        if ($lock) $this->unlock();
        return $res;
    }

    /*
     * Get the number of running threads
     */
    protected function getThreads($lock = false)
    {
        if ($lock) $this->lock();
        $res = shm_get_var($this->shared_data, self::_INDEX_THREADS);
        if ($lock) $this->unlock();
        return $res;
    }

    /*
     * Set the number of running threads
     */    
    protected function setThreads($threads, $lock = false)
    {
        if ($lock) $this->lock();
        $res = shm_put_var($this->shared_data, self::_INDEX_THREADS, $threads);
        if ($lock) $this->unlock();
        return $res;
    }

    /*
     * Increment the number of running threads
     */
    protected function incThreads($lock = false)
    {
        if ($lock) $this->lock();
        $threads = $this->getThreads();
        $res = shm_put_var($this->shared_data, self::_INDEX_THREADS, $threads + 1);
        MTLog::getInstance()->debug('incThreads, $threads = ' . ($threads + 1));
        if ($lock) $this->unlock();
        return $res;
    }

    /*
     * Decrement the number of running threads
     */
    protected function decThreads($lock = false)
    {
        if ($lock) $this->lock();
        $threads = $this->getThreads();
        $res = shm_put_var($this->shared_data, self::_INDEX_THREADS, $threads - 1);
        MTLog::getInstance()->debug('decThreads, $threads = ' . ($threads - 1));
        if ($lock) $this->unlock();
        return $res;
    }

    /*
     * Return true if any slot is free
     */
    protected function hasFreeSlot()
    {
        $threads = $this->getThreads();
        $res = ($threads < $this->max_threads) ? true : false;
        MTLog::getInstance()->debug('Has free slot ? => #running threads = ' . $threads);
        return $res;
    }

    /*
     * Assign a free slot
     *
     * Return null if no free slot is available
     */
    protected function requestSlot($lock = false)
    {
        MTLog::getInstance()->debug('Requesting slot ... ');
        $slot = null;
        if ($lock) $this->lock();
        $slots = shm_get_var($this->shared_data, self::_INDEX_SLOTS);
        for ($i = 0; $i < $this->max_threads; $i++) {
            if (!isset($slots[$i])) {
                $slots[$i] = true;
                $slot = $i;
                break;
            } else {
                if ($slots[$i] == false) {
                    $slots[$i] = true;
                    $slot = $i;
                    break;
                }
            }
        }
        shm_put_var($this->shared_data, self::_INDEX_SLOTS, $slots);
        if ($lock) $this->unlock();
        if (is_null($slot)) {
            MTLog::getInstance()->debug('no free slots !!');
        } else {
            MTLog::getInstance()->debug('slot ' . $slot . ' found.');
        }
        return $slot;
    }

    /*
     * Release given slot
     */
    protected function releaseSlot($slot, $lock = false) {
        if ($lock) $this->lock();
        $slots = shm_get_var($this->shared_data, self::_INDEX_SLOTS);
        $slots[$slot] = false;
        shm_put_var($this->shared_data, self::_INDEX_SLOTS, $slots);
        if ($lock) $this->unlock();
        MTLog::getInstance()->debug('Releasing slot ' . $slot);
        return true;
    }

}


 
  Advertise on this site Advertise on this site   Site map Site map   Statistics Statistics   Site tips Site tips   Privacy policy Privacy policy   Contact Contact  

For more information send a message to :
info at phpclasses dot org.
Copyright (c) Icontem 1999-2009 PHP Classes - PHP Class Scripts
  PHP Book Reviews - Reviews of books and other products