Login   Register  
PHP Classes
elePHPant
Icontem

File: include/class.MTDaemon.php

Recommend this page to a friend!
Stumble It! Stumble It! Bookmark in del.icio.us Bookmark in del.icio.us
  Classes of Killerwhile  >  Multi Threaded Daemon  >  include/class.MTDaemon.php  >  Download  
File: include/class.MTDaemon.php
Role: Class source
Content type: text/plain
Description: Main core class
Class: Multi Threaded Daemon
Manage the execution of parallel threads
Author: By
Last change: moving into the directory include/
Date: 6 years ago
Size: 14,559 bytes
 

Contents

Class file image Download
<?php

/**
 * MultiThreaded Daemon (MTD)
 * 
 * Copyright (c) 2007, Benoit Perroud
 * 
 * All rights reserved.
 * 
 * Redistribution and use in source and binary forms, with or
 * without modification, are permitted provided that the following
 * conditions are met: Redistributions of source code must retain the
 * above copyright notice, this list of conditions and the following
 * disclaimer. Redistributions in binary form must reproduce the above
 * copyright notice, this list of conditions and the following disclaimer
 * in the documentation and/or other materials provided with the
 * distribution.
 * 
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 * @package     MTD
 * @author      Benoit Perroud <ben@migtechnology.ch>
 * @copyright   2007 Benoit Perroud
 * @license     http://www.gnu.org/copyleft/gpl.html
 * @version     $Id: class.MTDaemon.php 8 2007-10-19 09:16:00Z killerwhile $
 *
 * See http://code.google.com/p/phpmultithreadeddaemon/ 
 * and http://phpmultithreaddaemon.blogspot.com/ for more information
 *
 */

require_once dirname(__FILE__) . '/class.MTLog.php';

abstract class MTDaemon 
{

    /*
     * Configuration vaiables
     */

    // max concurrent threads
    protected $max_threads = 4; // should be implemented with sem_get('name', $max_aquire) but this can't be dynamically updated as this var.

    // sleep time when no job
    protected $idle_sleep_time = 5; 

    /*
     * Internal constants
     */
    const _INDEX_DATA = 0;    // shared data goes here.
    const _INDEX_THREADS = 1; // how many threads are working
    const _INDEX_SLOTS = 2;   // which slot do the current thread use ?

    /*
     * Internal variables
     */
    protected $shm;                         // = ftok(__FILE__, 'g');
    protected $shared_data;                 // = shm_attach($this->shm);

    protected $mutex;                       // lock critical path, used in lock() and unlock()
    protected $mutex_main_process;          // lock main process only. Children can continue to run
    protected $mutex_children_processes;    // lock children processes only. Main can continue to run

    protected $main_thread_pid;

    /*
     * Constructor
     * 
     * @params $threads : number of concurrent threads, default 4
     * @params $idelsleeptime : time to sleep when no job ready (getNext return null), in seconds, default 5 
     */
    public function __construct($threads = null, $idlesleeptime = null)
    {
        if ($threads) $this->max_threads = $threads;
        if ($idlesleeptime) $this->idle_sleep_time = $idlesleeptime;
        $this->main_thread_pid = posix_getpid();
    }

    /*
     * Hook called just before the main loop.
     * 
     * Remark : cleanup code goes here.
     */
    protected function _prerun()
    {
        MTLog::getInstance()->info('Starting daemon with ' . $this->max_threads . ' slots');

        $this->shm                      = ftok(__FILE__, 'g'); // global shm
        $this->shared_data              = shm_attach($this->shm);

        $this->mutex                    = sem_get($this->shm);
        $this->mutex_main_process       = sem_get(ftok(__FILE__, 'm'));
        $this->mutex_children_processes = sem_get(ftok(__FILE__, 'c'));

        shm_put_var($this->shared_data, self::_INDEX_DATA, array());

        $this->setThreads(0);

        $slots = array();
        for ($i = 0; $i < $this->max_threads; $i++) {
            $slots[] = false;
        }
        shm_put_var($this->shared_data, self::_INDEX_SLOTS, $slots);
    }

    /*
     * Hook called just after the main loop
     */
    protected function _postrun()
    {
        MTLog::getInstance()->info('Stopping daemon. ');

        shm_remove($this->shared_data);
        sem_remove($this->mutex);
        sem_remove($this->mutex_process);
    }

    /*
     * Main loop, request next job using getNext() and execute run($job) in a separate thread
     * _prerun and _postrun hooks are called before and after the main loop -> usefull for cleanup and so on.
     */
    public function handle()
    {

        $this->run = true;

        $this->_prerun();

        while ($this->run) {

            /* 
             * Terminating all child, to not let some zombie leaking the memory.
             */

            MTLog::getInstance()->debug2('-- Next iteration ');

            $this->lock();

            // HACK : avoid zombie and free earlier the memory
            do {
                $res = pcntl_wait($status, WNOHANG);
                MTLog::getInstance()->debug2('$res = pcntl_wait($status, WNOHANG); called with $res = ' . $res);
                if ($res > 0) MTLog::getInstance()->debug('(finishing child with pid ' . $res . ')');
            } while ($res > 0);

            /*
             * Loop until a slot frees 
             */
            while (!$this->hasFreeSlot()) {
                $this->unlock();
                MTLog::getInstance()->debug('No more free slot, waiting');
                $res = pcntl_wait($status); // wait until a child ends up
                MTLog::getInstance()->debug2('$res = pcntl_wait($status); called with $res = ' . $res);
                if ($res > 0) {
                    MTLog::getInstance()->debug('Finishing child with pid ' . $res);
                } else {
                    MTLog::getInstance()->error('Outch1, this souldn\'t happen. Verify your implementation ...');
                    $this->run = false;
                    continue;
                }
                $this->lock();
            }

            $slot = $this->requestSlot();
            $this->incThreads();

            $this->unlock();

            if ($slot === null) {
                var_dump(shm_get_var($this->shared_data, self::_INDEX_DATA));
                var_dump(shm_get_var($this->shared_data, self::_INDEX_THREADS));
                var_dump(shm_get_var($this->shared_data, self::_INDEX_SLOTS));

                MTLog::getInstance()->error('Outch2, this souldn\'t happen. Verify your implementation ...');
                $this->run = false;
                continue;
            }
            
            /*
             * Request next action to handle
             */
            $next = $this->getNext($slot);
            
            /*
             * If no job
             */
            if (!$next) {

                MTLog::getInstance()->debug('No job, sleeping at most ' . $this->idle_sleep_time . ' sec ... ');

// TODO : waiting for signal pushed into a queue when inserting a new job.

                $this->lock();
                $this->releaseSlot($slot);
                $this->decThreads();
                $this->unlock();

                sleep($this->idle_sleep_time);

                continue;
                
            } else { 

                $pid = pcntl_fork();

                if ($pid == -1) {
                        MTLog::getInstance()->error('[fork] Duplication impossible');
                        $this->run = false;
                        continue;
                } else if ($pid) {

                        usleep(10); // HACK : give the hand to the child -> a simple way to better handle zombies

                        continue;
                } else {

                    MTLog::getInstance()->debug('Executing thread #' . posix_getpid() . ' in slot ' . number_format($slot));

                    $res = $this->run($next, $slot);

                    $this->lock();
                    $this->releaseSlot($slot);
                    $this->decThreads();
                    $this->unlock();

                    exit($res);

                }
            }
        }

        $this->_postrun();

    }

    /*
     * Request data of the next element to run in a thread
     * 
     * return null or false if no job currently
     */
    abstract public function getNext($slot);

    /*
     * Process the element fetched by getNext in a new thread
     * 
     * return the exiting status of the thread
     */
    abstract public function run($next, $slot);

    /*
     *
     */
    protected function lock()
    {
        MTLog::getInstance()->debug2('[lock] lock');
        $res = sem_acquire($this->mutex);
        if (!$res) exit(-1);
    }

    /*
     *
     */
    protected function unlock()
    {
        MTLog::getInstance()->debug2('[lock] unlock');
        $res = sem_release($this->mutex);
        if (!$res) exit(-1);
    }

    /*
     *
     */
    protected function lockMain()
    {
        MTLog::getInstance()->debug2('[lock] lock main process');
        $res = sem_acquire($this->mutex_main_process);
        if (!$res) exit(-1);
    }

    /*
     *
     */
    protected function unlockMain()
    {
        MTLog::getInstance()->debug2('[lock] unlock main process');
        $res = sem_release($this->mutex_main_process);
        if (!$res) exit(-1);
    }

    /*
     *
     */
    protected function lockChildren()
    {
        MTLog::getInstance()->debug2('[lock] lock children processes');
        $res = sem_acquire($this->mutex_children_processes);
        if (!$res) exit(-1);
    }

    /*
     *
     */
    protected function unlockChildren()
    {
        MTLog::getInstance()->debug2('[lock] unlock children processes');
        $res = sem_release($this->mutex_children_processes);
        if (!$res) exit(-1);
    }

    /*
     * Get a shared var based on hash.
     *
     * Return null if the var doesn't exist.
     */
    protected function getVar($name, $lock = false)
    {
        if ($lock) $this->lock();
        $vars = shm_get_var($this->shared_data, self::_INDEX_DATA);
        $value = (isset($vars[$name])) ? $vars[$name] : null;
        if ($lock) $this->unlock();
        return $value;
    }

    /*
     * Set a shared var.
     * 
     * Remark : the var should be serialized.
     */
    protected function setVar($name, $value, $lock = false)
    {
        if ($lock) $this->lock();
        $vars = shm_get_var($this->shared_data, self::_INDEX_DATA);
        $vars[$name] = $value;
        $res = shm_put_var($this->shared_data, self::_INDEX_DATA, $vars);
        if ($lock) $this->unlock();
        return $res;
    }

    /*
     * Get the number of running threads
     */
    protected function getThreads($lock = false)
    {
        if ($lock) $this->lock();
        $res = shm_get_var($this->shared_data, self::_INDEX_THREADS);
        if ($lock) $this->unlock();
        return $res;
    }

    /*
     * Set the number of running threads
     */    
    protected function setThreads($threads, $lock = false)
    {
        if ($lock) $this->lock();
        $res = shm_put_var($this->shared_data, self::_INDEX_THREADS, $threads);
        if ($lock) $this->unlock();
        return $res;
    }

    /*
     * Increment the number of running threads
     */
    protected function incThreads($lock = false)
    {
        if ($lock) $this->lock();
        $threads = $this->getThreads();
        $res = shm_put_var($this->shared_data, self::_INDEX_THREADS, $threads + 1);
        MTLog::getInstance()->debug('incThreads, $threads = ' . ($threads + 1));
        if ($lock) $this->unlock();
        return $res;
    }

    /*
     * Decrement the number of running threads
     */
    protected function decThreads($lock = false)
    {
        if ($lock) $this->lock();
        $threads = $this->getThreads();
        $res = shm_put_var($this->shared_data, self::_INDEX_THREADS, $threads - 1);
        MTLog::getInstance()->debug('decThreads, $threads = ' . ($threads - 1));
        if ($lock) $this->unlock();
        return $res;
    }

    /*
     * Return true if any slot is free
     */
    protected function hasFreeSlot()
    {
        $threads = $this->getThreads();
        $res = ($threads < $this->max_threads) ? true : false;
        MTLog::getInstance()->debug('Has free slot ? => #running threads = ' . $threads);
        return $res;
    }

    /*
     * Assign a free slot
     *
     * Return null if no free slot is available
     */
    protected function requestSlot($lock = false)
    {
        MTLog::getInstance()->debug('Requesting slot ... ');
        $slot = null;
        if ($lock) $this->lock();
        $slots = shm_get_var($this->shared_data, self::_INDEX_SLOTS);
        for ($i = 0; $i < $this->max_threads; $i++) {
            if (!isset($slots[$i])) {
                $slots[$i] = true;
                $slot = $i;
                break;
            } else {
                if ($slots[$i] == false) {
                    $slots[$i] = true;
                    $slot = $i;
                    break;
                }
            }
        }
        shm_put_var($this->shared_data, self::_INDEX_SLOTS, $slots);
        if ($lock) $this->unlock();
        if (is_null($slot)) {
            MTLog::getInstance()->debug('no free slots !!');
        } else {
            MTLog::getInstance()->debug('slot ' . $slot . ' found.');
        }
        return $slot;
    }

    /*
     * Release given slot
     */
    protected function releaseSlot($slot, $lock = false) {
        if ($lock) $this->lock();
        $slots = shm_get_var($this->shared_data, self::_INDEX_SLOTS);
        $slots[$slot] = false;
        shm_put_var($this->shared_data, self::_INDEX_SLOTS, $slots);
        if ($lock) $this->unlock();
        MTLog::getInstance()->debug('Releasing slot ' . $slot);
        return true;
    }

}