Login   Register  
PHP Classes
elePHPant
Icontem

File: WebClientService.class.php

Recommend this page to a friend!
Stumble It! Stumble It! Bookmark in del.icio.us Bookmark in del.icio.us
  Classes of Nathan Bruer  >  Web Socket Service  >  WebClientService.class.php  >  Download  
File: WebClientService.class.php
Role: Class source
Content type: text/plain
Description: Web Client process to handle communication between the web client and the server
Class: Web Socket Service
Handle Web socket accesses using child processes
Author: By
Last change: PHP 5.4 changed pass-by-reference to fully depreciate passing by reference to functions on function call.
Date: 2012-12-27 10:19
Size: 15,407 bytes
 

Contents

Class file image Download
<?php
abstract class WebClientService {
	// Magic Hash
	const GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11';
	// This is the max size a header may be received from a client. Default: 128kb
	const MAX_HEADER_SIZE = 0x4000;
	// How long in seconds after not receiving any data to ping client
	const PING_TIMER = 10;
	// Number of seconds to close connection after not receiving a pong.
	const PING_TIMEOUT = 20;
	// How often it will release the blocking so it can pickup on zombie processes and such.
	const BLOCKING_TIMER = 1;
	public $closed = false;
	public $protocol_version = 0;
	public $ip;
	public $port;
	public $socket;
	public $lastMsgTime;
	public $lastPingSent = false;
	public $lastPing;
	public $needsHandShake = true;
	public $master;

	public $currentData = '';

	/**
	 * When data is fully received from the WebSocket connection this function is executed. Extend this function to process the data.
	 * NOTE: you can access the data returned with the variable "$this->currentData"
	 */
	abstract public function dataReceived();
	/**
	* When data is received from the parent process it is processed in this function if it is not a type 'c' and 'i' ('c' means close process, 'i' means specific process relay)
	* @param string $type 1 Character code telling what type of message it is, 'c' is reserved for close connection and 'i' is reserved for specific process relay
	* @param string $data The data passed from the parent process to the child process.
	*/
	abstract public function parentDataRecv($type, $data);

	public function __construct($socket, $master){
		$this->master = $master;
		$this->socket = $socket;
		$this->lastMsgTime = time();
		socket_getpeername($socket, $this->ip, $this->port);
		Console::log("Successfully forked process!");
		Console::log("WebClient: Connection accepted for: $this->ip:$this->port");
		while(true){
			while(($child_id = pcntl_wait($status, WNOHANG)) > 0) // Removes zombie children if they exist
				Console::log("Removed zombie: $child_id");
			if($this->closed)
				return;
			$connections = array($this->master, $this->socket);
			socket_select($connections, $write = null, $except = null, static::BLOCKING_TIMER);
			foreach($connections as $connection){
				if($connection === $this->socket){
					// Is from web socket
					$this->recv();
				}else{
					// Is from parent process
					Console::log("Receiving master data...");
					if($l = socket_recv($connection, $data, 3, MSG_WAITALL)){
						$len = (ord($data{0}) << 16) | (ord($data{1}) << 8) | (ord($data{2}));
						if(!$len)
							continue;
						elseif(socket_recv($connection, $data, $len, MSG_WAITALL) == $len){
							Console::log("Got master's data...");
							$this->parentRecv($data);
						}else{
							// Not enough data
							Console::log("WebSocket: Error not enough data received!");
							$this->close();
							return;
						}
					}else{
						// Connection closed
						Console::log("WebSocket: Parent process connection closed");
						$this->close();
						return;
					}
				}
			}
			$this->check();
		}
	}
	public function parentRecv($data){
		switch($data{0}){
			case WebSocket::CHILD_PROCESS_RESPONSE_CLOSE:
				Console::log("WebSocket: Got close command from parent process");
				$this->close();
				return;
			default:
				Console::log("Received parent data (user defined)");
				$this->parentDataRecv($data{0}, substr($data, 1));
				return;
		}
	}
	public function close(){
		if(!$this->closed){
			Console::log("WebClient: Closing socket $this->socket");
			@socket_close($this->socket);
			@socket_close($this->master);
			$this->closed = true;
		}
	}
	public function recv(){
		if($this->needsHandShake){
			$lastChr = '';
			while(true){
				if(!socket_recv($this->socket, $buff, 1, MSG_WAITALL) || $buff === null){
					Console::log("WebClient: Looks like client '$this->socket' disconnected");
					$this->close();
					return false;
				}
				if($buff === "\r")
					continue;
				elseif($buff === "\n" && $lastChr === "\n"){
					$lastChr = $buff;
					$this->currentData .= $buff;
					break;
				}
				$lastChr = $buff;
				$this->currentData .= $buff;
			}
			$headers = self::getheaders($this->currentData);
			if(!isset($headers['Connection'])){
				Console::log("WebClient: Looks like client '$this->socket' is not a websocket, closing");
				$this->close();
				return false;
			}
			if(isset($headers['Sec-WebSocket-Version'])){
				$this->protocol_version = $headers['Sec-WebSocket-Version'];
				Console::log("WebClient: Client is version {$headers['Sec-WebSocket-Version']}, proceeding to handshake...");
				$responseH  = "HTTP/1.1 101 Switching Protocols\r\n" .
					"Upgrade: websocket\r\n" .
					"Connection: Upgrade\r\n" .
					"Sec-WebSocket-Accept: " . base64_encode(sha1($headers['Sec-WebSocket-Key'] . static::GUID, true)) . "\r\n\r\n";
			}else{
				if(isset($headers['Sec-WebSocket-Key1']) && isset($headers['Sec-WebSocket-Key2'])){
					$this->protocol_version = 0;
					Console::log("WebClient: Client is version 00, proceeding to handshake...");
					if(!socket_recv($this->socket, $code, 8, MSG_WAITALL) || $code === null){
						Console::log("WebClient: Looks like client '$this->socket' disconnected");
						$this->close();
						return false;
					}
					$key1 = preg_match_all('/[0-9]/', $headers['Sec-WebSocket-Key1'], $number) && preg_match_all('/ /', $headers['Sec-WebSocket-Key1'], $space)?implode('', $number[0]) / count($space[0]):'';
					$key2 = preg_match_all('/[0-9]/', $headers['Sec-WebSocket-Key2'], $number) && preg_match_all('/ /', $headers['Sec-WebSocket-Key2'], $space)?implode('', $number[0]) / count($space[0]):'';
					$hash = md5(pack('N', $key1).pack('N', $key2).$code, true);
					$responseH = "HTTP/1.1 101 WebSocket Protocol Handshake\r\n".
							"Upgrade: WebSocket\r\n".
							"Connection: Upgrade\r\n".
							"Sec-WebSocket-Origin: {$headers['Origin']}\r\n".
							"Sec-WebSocket-Location: ws://{$headers['Host']}{$headers['uri']}\r\n".
							"\r\n".
							$hash;
				}else{
					Console::log("WebClient: Looks like client '$this->socket' has version: {$headers['Sec-WebSocket-Version']} which is not a supported websocket version, closing");
					$this->close();
					return false;
				}
			}
			socket_write($this->socket, $responseH);
			$this->needsHandShake = false;
			$this->currentData = '';
		}else{
			if($this->protocol_version == 0){
				$l = socket_recv($this->socket, $buff, 1, MSG_WAITALL);
				$last_error = socket_last_error($this->socket);
				if(!$l || !($last_error == 105 || $last_error == 0)){
					socket_clear_error($this->socket);
					Console::log("WebClient: Looks like client '$this->socket' disconnected");
					$this->close();
					return false;
				}
				if($buff !== "\x00"){
					Console::log("WebClient: Client did not append a 0xFF byte to head of request");
					$this->close();
					return false;
				}
				$this->currentData = '';
				while(true){
					$l = socket_recv($this->socket, $buff, 1, MSG_WAITALL);
					$last_error = socket_last_error($this->socket);
					if(!$l || !($last_error == 105 || $last_error == 0)){
						socket_clear_error($this->socket);
						Console::log("WebClient: Looks like client '$this->socket' disconnected");
						$this->close();
						return false;
					}
					if($l){
						$this->lastMsgTime = time();
						$this->lastPingSent = null;
					}
					if($buff === "\xFF"){
						if(!$this->currentData){
							Console::log("WebClient: Received pong from: $this->socket");
							$this->currentData = '';
							return;
						}
						$this->dataReceived();
						$this->currentData = '';
						break;
					}
					$this->currentData .= $buff;
				}
			}elseif($this->protocol_version != 0){
				$l = socket_recv($this->socket, $buff, 2048, MSG_DONTWAIT);
				$last_error = socket_last_error($this->socket);
				if(!$l || !($last_error == 105 || $last_error == 0)){
					socket_clear_error($this->socket);
					Console::log("WebClient: Looks like client '$this->socket' disconnected");
					$this->close();
					return false;
				}
				if($l){
					$this->lastMsgTime = time();
					$this->lastPingSent = null;
				}
				$this->currentData .= $buff;
				// Not enough data to do anything with
				if(strlen($this->currentData) < 2)
					return;
				static $step, $firstChr, $secondChr, $masked, $dataLen, $maskIndex, $dataIndex, $mask;
				while(true){
					switch($step){
						// Step 0 checks data length and decides what length type to use
						case 0:
							$firstChr = $this->currentData{0};
							$secondChr = $this->currentData{1};
							$masked = (bool) ($secondChr & chr(0x80));
							$dataLen = ord($secondChr & chr(0x7F));
							if($dataLen >= 127){
								$step = 1;
								continue 2;
							}elseif($dataLen >= 126){
								$step = 2;
								continue 2;
							}else{
								$step = 3;
								$maskIndex = 2;
								continue 2;
							}
						// If length is for 64bit type.
						case 1:
							if(strlen($this->currentData) < 10)
								return;
							$dataLen = (ord($this->currentData{2}) << 54) | (ord($this->currentData{3}) << 48) | (ord($this->currentData{4}) << 40) | (ord($this->currentData{5}) << 32) | (ord($this->currentData{6}) << 24) | (ord($this->currentData{7}) << 16) | (ord($this->currentData{8}) << 8) | ord($this->currentData{9});
							$maskIndex = 10;
							$step = 3;
							continue 2;
						// If length is 16bit type
						case 2:
							if(strlen($this->currentData) < 4)
								return;
							$dataLen = (ord($this->currentData{2}) << 8) | ord($this->currentData{3});
							$maskIndex = 4;
							$step = 3;
							continue 2;
						// Gets the mask if there is one
						case 3:
							// Masking
							if($masked){
								if(strlen($this->currentData) < $maskIndex + 4)
									return;
								$mask = substr($this->currentData, $maskIndex, 4);
								$dataIndex = $maskIndex + 4;
							}else{
								$dataIndex = $maskIndex;
							}
							$step = 4;
							continue 2;
						// Retreives the data
						case 4:
							if($dataLen + $dataIndex > strlen($this->currentData))
								return;
							$firstChrInt = ord($firstChr);
							$optCodes = $firstChrInt & 0xF;
							if($optCodes == 0xA){
								// Received Pong
								//Console::log("WebClient: Received pong from: $this->socket");
								$this->currentData = '';
								$step = 0;
							}elseif($optCodes == 0x9){
								// Received Ping Request
								socket_write($this->socket, chr(0x8A) . chr(128) . chr(mt_rand(1, 255)) . chr(mt_rand(1, 255)) . chr(mt_rand(1, 255)) . chr(mt_rand(1, 255)));
								Console::log("WebClient: Received ping request responding with pong from: $this->socket");
								$this->currentData = '';
								$step = 0;
							}elseif($optCodes == 0x8){
								// Received Close Request
								Console::log("WebClient:: Received close request from: $this->socket");
								$this->close();
								return;
							}else{
								if($masked)
									for($i=0;$i<$dataLen;$i++)
										$this->currentData{$i + $dataIndex} = $this->currentData{$i + $dataIndex} ^ $mask{$i % 4};
								$this->currentData = substr($this->currentData, $dataIndex);
								$this->dataReceived();
								$this->currentData = '';
								$step = 0;
							}
							break 2;
						default:
							$step = 0;
							continue 2;
					}
				}
			}
		}
	}
	public function ping(){
		if($this->protocol_version != 0)
			@socket_write($this->socket, chr(0x89) . chr(128) . chr(mt_rand(1, 255)) . chr(mt_rand(1, 255)) . chr(mt_rand(1, 255)) . chr(mt_rand(1, 255)));
		else
			if(@socket_write($this->socket, "\x00\xFF") === false){
				Console::log("Failed to send ping request");
				$this->close();
				return;
			}
	}
	public function check(){
		if(!$this->lastPingSent && $this->lastMsgTime + static::PING_TIMER < time()){
			$this->lastPingSent = time();
			//Console::log("WebSocket: Sending Ping to client on $this->socket");
			$this->ping();
			
		}elseif($this->lastPingSent && $this->lastPingSent + static::PING_TIMEOUT < time() && $this->lastMsgTime + static::PING_TIMEOUT < time()){
			Console::log("WebSocket: Ping not received for ". static::PING_TIMEOUT . " seconds on $this->socket");
			$this->close();
		}
	}
	public function fork(){
		$id = pcntl_fork();
		if($id == -1){
			return false;
		}elseif($id)
			// parent
			return false;
		else
			// child
			set_time_limit(30);
			return true;
	}
	public function sendToParent($type, $data, array $pids = array()){
		if($pids)
			// Relay to all processes
			$data = 'i'.implode(',', $pids).':'.$type.$data;
		else
			$data = $type.$data;
		$i = 0;
		$length = strlen($data);
		if($length > 0xFFFFFF){
			Console::log("Failed to send packet to master, 16MB limit hit");
			return false;
		}
		for($i=0;$i<3;$i++)
			$data = chr(($length >> ($i * 8)) & 0xFF).$data;
		$length = strlen($data);
		do{
			if(($d = @socket_send($this->master, $data, strlen($data), 0)) === false){
				Console::log("WebSocket: Failed to send data to: $this->master");
				$this->close();
				return false;
			}
			if(!$d)
				usleep(25);
			$i += $d;
		}while($length > $i);
		return true;
	}
	public function send($data, $encode = true){
		// Fork over to new process to send the data to keep blocking from consumeing the entire service.
		if(!$this->fork())
			return;
		$i = 0;
		if($encode)
			$data = $this->encodeData($data);
		$length = strlen($data);
		do{
			if(($d = @socket_write($this->socket, $da = substr($data, $i), strlen($da))) === false){
				Console::log("WebSocket: Failed to send data to: $this->socket");
				$this->close();
				break;
			}
			if(!$d)
				usleep(25);
			$i += $d;
		}while($length > $i);
		exit;
	}
	public static function getheaders($header){
		preg_match("/GET (.+) HTTP\\/([0-9]+\\.[0-9]+)[\r\n]+/", $header, $match);
		$return = array(
			'uri' => $match[1],
			'http-version' => $match[2]
		);
		preg_match_all("/([^:]+): ([^\n\r]*)[\r\n]+/", $header, $matches, PREG_SET_ORDER);
		foreach($matches as $match)
			$return[$match[1]] = $match[2];
		return $return;
	}
	public function encodeData($data){
		if($this->protocol_version != 0){
			$mask = chr(mt_rand(1, 255)) . chr(mt_rand(1, 255)) . chr(mt_rand(1, 255)) . chr(mt_rand(1, 255));
			$frame = chr(0x81);
			$dataLength = strlen($data);

			if($dataLength <= 125)
				$frame .= chr($dataLength | 128);
			elseif($dataLength < 0xFFFF){
				$frame .= chr(254);
				$frame .= chr($dataLength >> 8);
				$frame .= chr($dataLength & 0xFF);
			}else{
				$frame .= chr(255);
				$frame .= chr($dataLength >> 0x30 & 0xFF);
				$frame .= chr($dataLength >> 0x28 & 0xFF);
				$frame .= chr($dataLength >> 0x20 & 0xFF);
				$frame .= chr($dataLength >> 0x18 & 0xFF);
				$frame .= chr($dataLength >> 0x10 & 0xFF);
				$frame .= chr($dataLength >> 0x8 & 0xFF);
				$frame .= chr($dataLength & 0xFF);
			}
			$frame .= $mask;
			for($i = 0; $i < strlen($data); $i++)
				$frame .= $data[$i] ^ $mask{$i % 4};
			return $frame;
		}elseif($this->protocol_version == 0){
			return "\x00$data\xFF";
		}
	}
}