You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							1098 lines
						
					
					
						
							37 KiB
						
					
					
				
			
		
		
		
			
			
			
				
					
				
				
					
				
			
		
		
	
	
							1098 lines
						
					
					
						
							37 KiB
						
					
					
				
								<?php
							 | 
						|
								declare(strict_types=1);
							 | 
						|
								
							 | 
						|
								namespace App\Libs;
							 | 
						|
								
							 | 
						|
								class MQTTClient {
							 | 
						|
								
							 | 
						|
								    // MQTT control packet types (here left shifted 4 bits)
							 | 
						|
								    const MQTT_CONNECT     = 0x10; // Client request to connect to Server
							 | 
						|
								    const MQTT_CONNACK     = 0x20; // Connect acknowledgment
							 | 
						|
								    const MQTT_PUBLISH     = 0x30; // Publish message
							 | 
						|
								    const MQTT_PUBACK      = 0x40; // Publish acknowledgment
							 | 
						|
								    const MQTT_PUBREC      = 0x50; // Publish received (assured delivery part 1)
							 | 
						|
								    const MQTT_PUBREL      = 0x62; // Publish release (assured delivery part 2)
							 | 
						|
								    const MQTT_PUBCOMP     = 0x70; // Publish complete (assured delivery part 3)
							 | 
						|
								    const MQTT_SUBSCRIBE   = 0x80; // Client subscribe request
							 | 
						|
								    const MQTT_SUBACK      = 0x90; // Subscribe acknowledgment
							 | 
						|
								    const MQTT_UNSUBSCRIBE = 0xa0; // Unsubscribe request
							 | 
						|
								    const MQTT_UNSUBACK    = 0xb0; // Unsubscribe acknowledgment
							 | 
						|
								    const MQTT_PINGREQ     = 0xc0; // PING request
							 | 
						|
								    const MQTT_PINGRESP    = 0xd0; // PING response
							 | 
						|
								    const MQTT_DISCONNECT  = 0xe0; // Client is disconnecting
							 | 
						|
								
							 | 
						|
								    // MQTT quality of service levels
							 | 
						|
								    const MQTT_QOS0 = 0x00;
							 | 
						|
								    const MQTT_QOS1 = 0x01;
							 | 
						|
								    const MQTT_QOS2 = 0x02;
							 | 
						|
								
							 | 
						|
								    // MQTT status on last read from stream
							 | 
						|
								    const READ_STATUS_NO_READ = 0;
							 | 
						|
								    const READ_STATUS_OK = 200;
							 | 
						|
								    const READ_STATUS_ERROR = 400;
							 | 
						|
								    const READ_STATUS_ERROR_HEADER = 401;
							 | 
						|
								    const READ_STATUS_ERROR_PACKETLENGTH = 402;
							 | 
						|
								    const READ_STATUS_ERROR_PAYLOAD = 403;
							 | 
						|
								
							 | 
						|
								    private $socket = null;           // Socket resource reference
							 | 
						|
								    private $socketTimeout;           // Default socket timeout in milliseconds
							 | 
						|
								    private $socketReadDelay = 1000;  // Delay in milliseconds between read attempts on socket
							 | 
						|
								
							 | 
						|
								    private $protocol = 'tcp';
							 | 
						|
								    private $serverAddress;
							 | 
						|
								    private $serverPort;
							 | 
						|
								    private $clientId;
							 | 
						|
								
							 | 
						|
								    public $caFile = null;
							 | 
						|
								    public $localCert = null;
							 | 
						|
								    public $localPrivateKey = null;
							 | 
						|
								
							 | 
						|
								    private $connectCleanSession;
							 | 
						|
								    private $connectKeepAlive;
							 | 
						|
								    private $connectWill = false;
							 | 
						|
								    private $connectWillQos;
							 | 
						|
								    private $connectWillRetain;
							 | 
						|
								    private $connectUsername;
							 | 
						|
								    private $connectPassword;
							 | 
						|
								
							 | 
						|
								    private $willTopic;
							 | 
						|
								    private $willMessage;
							 | 
						|
								
							 | 
						|
								    private $packetId;         // Packet identifier to validate return packets
							 | 
						|
								    private $pingReqTime;      // Time when last PINGREQ was sent
							 | 
						|
								    private $serverAliveTime;  // Time for last response from server
							 | 
						|
								
							 | 
						|
								    private $debug = false;
							 | 
						|
								    private $lastReadStatus = self::READ_STATUS_NO_READ;
							 | 
						|
								
							 | 
						|
								    private $packetQueue = []; // Queue for received but unhandled packages
							 | 
						|
								
							 | 
						|
								    public $lastConnectResult = 0;
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Class constructor - Sets up connection parameters
							 | 
						|
								     *
							 | 
						|
								     * @param string $address Address to server
							 | 
						|
								     * @param string $port Port to server
							 | 
						|
								     * @param string $protocol Which protocol to use
							 | 
						|
								     */
							 | 
						|
								    function __construct($address, $port=null, $protocol='tcp'){
							 | 
						|
								        if ($this->setConnection($address, $port, $protocol)) {
							 | 
						|
								            ;
							 | 
						|
								        }
							 | 
						|
								        $this->packetId = rand(1,100)*100; // Reduce risk of creating duplicate ids in sequential sessions
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Class destructor - Close socket
							 | 
						|
								     */
							 | 
						|
								    function __destruct(){
							 | 
						|
								        $this->close();
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Setup conection parameters
							 | 
						|
								     *
							 | 
						|
								     * @param string $address
							 | 
						|
								     * @param string $port
							 | 
						|
								     * @param string $protocol
							 | 
						|
								     * @return boolean If return false then using default parameters where validation failed
							 | 
						|
								     */
							 | 
						|
								    function setConnection($address, $port=null, $protocol='tcp'){
							 | 
						|
								        $this->serverAddress = $address;
							 | 
						|
								        $this->serverPort = $port;
							 | 
						|
								
							 | 
						|
								        // Validate protocol
							 | 
						|
								        $protocol = strtolower($protocol);
							 | 
						|
								        if (($protocol != 'tcp') && !self::isEncryptionProtocol($protocol)) {
							 | 
						|
								            $this->debugMessage('Invalid protocol ('.$protocol.'). Setting to default (tcp).');
							 | 
						|
								            $this->protocol = 'tcp';
							 | 
						|
								            return false;
							 | 
						|
								        }
							 | 
						|
								        $this->protocol = $protocol;
							 | 
						|
								
							 | 
						|
								        return true;
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Build url for connecting to stream
							 | 
						|
								     *
							 | 
						|
								     * @return string
							 | 
						|
								     */
							 | 
						|
								    private function getUrl() {
							 | 
						|
								        $url = '';
							 | 
						|
								        if ($this->protocol) $url .= $this->protocol .'://';
							 | 
						|
								        $url .= $this->serverAddress;
							 | 
						|
								        if ($this->serverPort) $url .= ':'. $this->serverPort;
							 | 
						|
								        return $url;
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Check if encryption protocol is supported
							 | 
						|
								     *
							 | 
						|
								     * @param string $protcol
							 | 
						|
								     * @return boolean
							 | 
						|
								     */
							 | 
						|
								    private static function isEncryptionProtocol($protocol) {
							 | 
						|
								        return in_array(strtolower($protocol), ['ssl', 'tls', 'tlsv1.0', 'tlsv1.1', 'tlsv1.2', 'sslv3']);
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Sets server certificate and protocol for ssl/tls encryption
							 | 
						|
								     *
							 | 
						|
								     * @param string $caFile CA file to identify server
							 | 
						|
								     * @param string $protocl Crypto protocol (See http://php.net/manual/en/migration56.openssl.php)
							 | 
						|
								     * @return boolean False if settings failed, else true
							 | 
						|
								     */
							 | 
						|
								    public function setEncryption($caFile, $protocol = null) {
							 | 
						|
								        if (file_exists($caFile)) {
							 | 
						|
								            $this->caFile = $caFile;
							 | 
						|
								        } else {
							 | 
						|
								            $this->debugMessage('CA file not found');
							 | 
						|
								            return false;
							 | 
						|
								        }
							 | 
						|
								        if(self::isEncryptionProtocol($protocol)) {
							 | 
						|
								            $this->protocol = $protocol;
							 | 
						|
								        } else if (!is_null($protocol)) {
							 | 
						|
								            $this->debugMessage('Unknown encryption protocol');
							 | 
						|
								            return false;
							 | 
						|
								        }
							 | 
						|
								        return true;
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Sets client crt and key files for client-side authentication
							 | 
						|
								     *
							 | 
						|
								     * @param string $crtFile Client certificate file
							 | 
						|
								     * @param string $keyFile Client key file
							 | 
						|
								     * @return boolean False if settings failed, else true
							 | 
						|
								     */
							 | 
						|
								    public function setClientEncryption($certificateFile, $keyFile) {
							 | 
						|
								        if (!file_exists($certificateFile)) {
							 | 
						|
								            $this->debugMessage('Client certificate file not found');
							 | 
						|
								            return false;
							 | 
						|
								        }
							 | 
						|
								        if (!file_exists($keyFile)) {
							 | 
						|
								            $this->debugMessage('Client key file not found');
							 | 
						|
								            return false;
							 | 
						|
								        }
							 | 
						|
								        $this->localCert= $certificateFile;
							 | 
						|
								        $this->localPrivateKey = $keyFile;
							 | 
						|
								        return true;
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Set authentication details to be used when connecting
							 | 
						|
								     *
							 | 
						|
								     * @param string $username Username
							 | 
						|
								     * @param string $password Password
							 | 
						|
								     */
							 | 
						|
								    public function setAuthentication($username, $password) {
							 | 
						|
								        $this->connectUsername= $username;
							 | 
						|
								        $this->connectPassword = $password;
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Set will (last message defined by MQTT) to send when connection is lost
							 | 
						|
								     *
							 | 
						|
								     * @param string $topic
							 | 
						|
								     * @param string $message
							 | 
						|
								     * @param integer $qos
							 | 
						|
								     * @param boolean $retain
							 | 
						|
								     */
							 | 
						|
								    public function setWill($topic, $message, $qos=1, $retain=false) {
							 | 
						|
								        $this->connectWill = true;
							 | 
						|
								        $this->connectWillQos = $qos;
							 | 
						|
								        $this->connectWillRetain = $retain;
							 | 
						|
								        $this->willTopic = $topic;
							 | 
						|
								        $this->willMessage = $message;
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Connect to MQTT server
							 | 
						|
								     *
							 | 
						|
								     * @param string $clientId Unique id used by the server to identify the client
							 | 
						|
								     * @param boolean $cleanSession Set true to clear session on server, ie queued messages are purged (not recieved)
							 | 
						|
								     * @param integer $keepAlive Number of seconds a connection is considered to be alive without traffic
							 | 
						|
								     * @param integer $timeout Number of millliseconds before timeout when reading from socket
							 | 
						|
								     * @return boolean Returns false if connection failed
							 | 
						|
								     */
							 | 
						|
								    public function sendConnect($clientId, $cleanSession=false, $keepAlive=10, $timeout=5000) {
							 | 
						|
								
							 | 
						|
								        if (!$this->serverAddress) return false;
							 | 
						|
								
							 | 
						|
								        // Basic validation of clientid
							 | 
						|
								        // Note: A MQTT server may accept other chars and more than 23 chars in the clientid but that is optional,
							 | 
						|
								        // all chars below up to 23 chars are required to be accepted (see section "3.1.3.1 Client Identifier" of the standard)
							 | 
						|
								        if(preg_match("/[^0-9a-zA-Z]/",$clientId)) {
							 | 
						|
								            $this->debugMessage('ClientId can only contain characters 0-9,a-z,A-Z');
							 | 
						|
								            return false;
							 | 
						|
								        }
							 | 
						|
								        if(strlen($clientId) > 23) {
							 | 
						|
								            $this->debugMessage('ClientId max length is 23 characters/numbers');
							 | 
						|
								            return false;
							 | 
						|
								        }
							 | 
						|
								        $this->clientId = $clientId;
							 | 
						|
								
							 | 
						|
								        $this->connectCleanSession = $cleanSession;
							 | 
						|
								        $this->connectKeepAlive = $keepAlive;
							 | 
						|
								        $this->socketTimeout = $timeout;
							 | 
						|
								
							 | 
						|
								        // Setup certificates if encryption protocol selected
							 | 
						|
								        if ($this->isEncryptionProtocol($this->protocol)) {
							 | 
						|
								            $mozillaCiphers = implode(':', array(
							 | 
						|
								                'ECDHE-RSA-AES128-GCM-SHA256',
							 | 
						|
								                'ECDHE-ECDSA-AES128-GCM-SHA256',
							 | 
						|
								                'ECDHE-RSA-AES256-GCM-SHA384',
							 | 
						|
								                'ECDHE-ECDSA-AES256-GCM-SHA384',
							 | 
						|
								                'DHE-RSA-AES128-GCM-SHA256',
							 | 
						|
								                'DHE-DSS-AES128-GCM-SHA256',
							 | 
						|
								                'kEDH+AESGCM',
							 | 
						|
								                'ECDHE-RSA-AES128-SHA256',
							 | 
						|
								                'ECDHE-ECDSA-AES128-SHA256',
							 | 
						|
								                'ECDHE-RSA-AES128-SHA',
							 | 
						|
								                'ECDHE-ECDSA-AES128-SHA',
							 | 
						|
								                'ECDHE-RSA-AES256-SHA384',
							 | 
						|
								                'ECDHE-ECDSA-AES256-SHA384',
							 | 
						|
								                'ECDHE-RSA-AES256-SHA',
							 | 
						|
								                'ECDHE-ECDSA-AES256-SHA',
							 | 
						|
								                'DHE-RSA-AES128-SHA256',
							 | 
						|
								                'DHE-RSA-AES128-SHA',
							 | 
						|
								                'DHE-DSS-AES128-SHA256',
							 | 
						|
								                'DHE-RSA-AES256-SHA256',
							 | 
						|
								                'DHE-DSS-AES256-SHA',
							 | 
						|
								                'DHE-RSA-AES256-SHA',
							 | 
						|
								                'AES128-GCM-SHA256',
							 | 
						|
								                'AES256-GCM-SHA384',
							 | 
						|
								                'ECDHE-RSA-RC4-SHA',
							 | 
						|
								                'ECDHE-ECDSA-RC4-SHA',
							 | 
						|
								                'AES128',
							 | 
						|
								                'AES256',
							 | 
						|
								                'RC4-SHA',
							 | 
						|
								                'HIGH',
							 | 
						|
								                '!aNULL',
							 | 
						|
								                '!eNULL',
							 | 
						|
								                '!EXPORT',
							 | 
						|
								                '!DES',
							 | 
						|
								                '!3DES',
							 | 
						|
								                '!MD5',
							 | 
						|
								                '!PSK'
							 | 
						|
								            ));
							 | 
						|
								            // Secure socket communication with these parameters, a ca-file is required
							 | 
						|
								            $options = [];
							 | 
						|
								            $options['verify_peer'] = true;
							 | 
						|
								            $options['verify_peer_name'] = true;
							 | 
						|
								            $options['verify_depth'] = 5;
							 | 
						|
								            $options['disable_compression'] = true;
							 | 
						|
								            $options['SNI_enabled'] = true;
							 | 
						|
								            $options['ciphers'] = $mozillaCiphers;
							 | 
						|
								            if($this->caFile) {
							 | 
						|
								                $options['cafile'] = $this->caFile;
							 | 
						|
								            }
							 | 
						|
								            if($this->localCert) {
							 | 
						|
								                $options['local_cert'] = $this->localCert;
							 | 
						|
								                if ($this->localPrivateKey) {
							 | 
						|
								                    $options['local_pk'] = $this->localPrivateKey;
							 | 
						|
								                }
							 | 
						|
								            }
							 | 
						|
								            $socketContext = stream_context_create(['ssl' => $options]);
							 | 
						|
								            $this->debugMessage('Settings socket options: '. var_export($options, true));
							 | 
						|
								        } else {
							 | 
						|
								            $socketContext = null;
							 | 
						|
								        }
							 | 
						|
								
							 | 
						|
								        // Try to open socket
							 | 
						|
								        try {
							 | 
						|
								            $this->debugMessage('Opening socket to: '. $this->getUrl());
							 | 
						|
								            if ($socketContext) {
							 | 
						|
								                $this->socket = stream_socket_client($this->getUrl(), $errno, $errstr, 10, STREAM_CLIENT_CONNECT, $socketContext);
							 | 
						|
								            } else {
							 | 
						|
								                $this->socket = stream_socket_client($this->getUrl(), $errno, $errstr, 10, STREAM_CLIENT_CONNECT);
							 | 
						|
								            }
							 | 
						|
								        } catch (\ErrorException $error) {
							 | 
						|
								            $this->debugMessage('Exception: Could not open stream with error message: '. $error->getMessage());
							 | 
						|
								            $this->socket = null;
							 | 
						|
								            return false;
							 | 
						|
								        }
							 | 
						|
								
							 | 
						|
								        // Check if socket was opened successfully
							 | 
						|
								        if ($this->socket === false) {
							 | 
						|
								            $this->socket = null;
							 | 
						|
								            $this->debugMessage('Connection failed. Error-no:'. $errno .' Error message: '. $errstr);
							 | 
						|
								            return false;
							 | 
						|
								        }
							 | 
						|
								
							 | 
						|
								        // Set socket timeout
							 | 
						|
								        ini_set('default_socket_timeout', '10');
							 | 
						|
								        stream_set_timeout($this->socket, 0, $this->socketTimeout * 1000);
							 | 
						|
								        // Set stream to non-blocking mode, ie do not wait to read if stream is empty
							 | 
						|
								        stream_set_blocking($this->socket, true);
							 | 
						|
								
							 | 
						|
								        // Calculate connect flags to use in CONNECT header
							 | 
						|
								        $connectFlags = 0;
							 | 
						|
								        if ($this->connectCleanSession) $connectFlags += 0x02;
							 | 
						|
								        if ($this->connectWill) {
							 | 
						|
								            $connectFlags += 0x04;
							 | 
						|
								            if ($this->connectWillQos) $connectFlags += ($this->connectWill << 3);
							 | 
						|
								            if ($this->connectWillRetain) $connectFlags += 0x20;
							 | 
						|
								        }
							 | 
						|
								        if ($this->connectUsername) {
							 | 
						|
								            $connectFlags += 0x80;
							 | 
						|
								            if ($this->connectPassword) $connectFlags += 0x40;
							 | 
						|
								        }
							 | 
						|
								
							 | 
						|
								        // Build payload and header for CONNECT-packet
							 | 
						|
								        $payload = chr(0x00).chr(0x04);   // MSB & LSB length of MQTT = 4
							 | 
						|
								        $payload .= 'MQTT';
							 | 
						|
								        $payload .= chr(0x04);            // Protocol level (3.1.1)
							 | 
						|
								        $payload .= chr($connectFlags);   // Connect flags
							 | 
						|
								        $payload .= chr($this->connectKeepAlive >> 8);     // Keepalive (MSB)
							 | 
						|
								        $payload .= chr($this->connectKeepAlive & 0xff);   // Keepalive (LSB)
							 | 
						|
								        if ($this->connectCleanSession && empty($this->clientId)) {
							 | 
						|
								            $this->clientId = rand(1,999999999);
							 | 
						|
								        }
							 | 
						|
								        if ($this->clientId) {
							 | 
						|
								            $payload .= $this->createPayload($this->clientId);
							 | 
						|
								        }
							 | 
						|
								        if($this->connectWill){
							 | 
						|
								            $payload .= $this->createPayload($this->willTopic);
							 | 
						|
								            $payload .= $this->createPayload($this->willMessage);
							 | 
						|
								        }
							 | 
						|
								        if($this->connectUsername) {
							 | 
						|
								            $payload .= $this->createPayload($this->connectUsername);
							 | 
						|
								        }
							 | 
						|
								        if ($this->connectPassword) {
							 | 
						|
								            $payload .= $this->createPayload($this->connectPassword);
							 | 
						|
								        }
							 | 
						|
								        $header = $this->createHeader(self::MQTT_CONNECT, $payload);
							 | 
						|
								        $this->debugMessage('Sending CONNECT');
							 | 
						|
								        $this->send($header . $payload);
							 | 
						|
								
							 | 
						|
								        // Wait for CONNACK packet
							 | 
						|
								        $response = $this->waitForPacket(self::MQTT_CONNACK);
							 | 
						|
								        if($response !== false && ($response[2] == chr(0))) {
							 | 
						|
								            $this->debugMessage('Connected to MQTT');
							 | 
						|
								            $this->lastConnectResult = 0;
							 | 
						|
								            return true;
							 | 
						|
								        } else {
							 | 
						|
								            $this->debugMessage('Connection failed! Error: '. ord($response[2]));
							 | 
						|
								            $this->lastConnectResult = ord($response[2]);
							 | 
						|
								            $this->close();
							 | 
						|
								            return false;
							 | 
						|
								        }
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Publish a topic and message (QoS 0,1,2 supported)
							 | 
						|
								     *
							 | 
						|
								     * @param string $topic
							 | 
						|
								     * @param string $message
							 | 
						|
								     * @param byte $qos
							 | 
						|
								     * @return boolean
							 | 
						|
								     */
							 | 
						|
								    public function sendPublish($topic, $message, $qos = self::MQTT_QOS1, $retain = 0) {
							 | 
						|
								        if(!$this->isConnected()) return false;
							 | 
						|
								        if($qos!=self::MQTT_QOS0 && $qos!=self::MQTT_QOS1 && $qos!=self::MQTT_QOS2) return false;
							 | 
						|
								
							 | 
						|
								        $packetId = $this->getNextPacketId();
							 | 
						|
								        $payload = $this->createPayload($topic);
							 | 
						|
								        if($qos >= self::MQTT_QOS1) {
							 | 
						|
								            // Packet identifier required for QoS level >= 1
							 | 
						|
								            $payload .= $this->getPacketIdPayload();
							 | 
						|
								        }
							 | 
						|
								        $payload .= $message;
							 | 
						|
								
							 | 
						|
								        $dupFlag = 0;
							 | 
						|
								        $header = $this->createHeader(self::MQTT_PUBLISH + ($dupFlag<<3) + ($qos<<1) + $retain, $payload);
							 | 
						|
								        $this->debugMessage('Sending PUBLISH');
							 | 
						|
								        $this->send($header . $payload);
							 | 
						|
								
							 | 
						|
								        if($qos == self::MQTT_QOS1) {
							 | 
						|
								            // If QoS level 1, only a PUBACK packet is expected
							 | 
						|
								            $response = $this->waitForPacket(self::MQTT_PUBACK, $packetId);
							 | 
						|
								            if($response === false) {
							 | 
						|
								                $this->debugMessage('Packet missing, expecting PUBACK');
							 | 
						|
								                return false;
							 | 
						|
								            }
							 | 
						|
								        } elseif($qos == self::MQTT_QOS2) {
							 | 
						|
								            // If QoS level 2, a PUBREC packet is expected
							 | 
						|
								            $response = $this->waitForPacket(self::MQTT_PUBREC, $packetId);
							 | 
						|
								            if($response === false) {
							 | 
						|
								                $this->debugMessage('Packet missing, expecting PUBREC');
							 | 
						|
								                return false;
							 | 
						|
								            }
							 | 
						|
								
							 | 
						|
								            // Send PUBREL
							 | 
						|
								            $response = $this->sendPubRel($packetId);
							 | 
						|
								            if($response === false) {
							 | 
						|
								                $this->debugMessage('Failed to send PUBREL');
							 | 
						|
								                return false;
							 | 
						|
								            }
							 | 
						|
								
							 | 
						|
								            // A PUBCOMP packet is expected
							 | 
						|
								            $response = $this->waitForPacket(self::MQTT_PUBCOMP, $packetId);
							 | 
						|
								            if($response === false) {
							 | 
						|
								                $this->debugMessage('Packet missing, expecting PUBCOMP');
							 | 
						|
								                return false;
							 | 
						|
								            }
							 | 
						|
								        }
							 | 
						|
								
							 | 
						|
								        return true;
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Send PUBACK as response to a recieved PUBLISH packet (QoS Level 1)
							 | 
						|
								     *
							 | 
						|
								     * @param integer $packetId Packet identifier of PUBLISH packet
							 | 
						|
								     * @return boolean Returns true if packet sent successfully
							 | 
						|
								     */
							 | 
						|
								    public function sendPubAck($packetId) {
							 | 
						|
								        if(!$this->isConnected()) return false;
							 | 
						|
								
							 | 
						|
								        $payload = chr(($packetId & 0xff00)>>8) . chr($packetId & 0xff);
							 | 
						|
								        $header = $this->createHeader(self::MQTT_PUBACK, $payload);
							 | 
						|
								        $this->debugMessage('Sending PUBACK');
							 | 
						|
								        $this->send($header . $payload);
							 | 
						|
								
							 | 
						|
								        return true;
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Send PUBREC as response to a recieved PUBLISH packet (QoS Level 2)
							 | 
						|
								     *
							 | 
						|
								     * @param integer $packetId Packet identifier of PUBLISH packet
							 | 
						|
								     * @return boolean Returns true if packet sent successfully
							 | 
						|
								     */
							 | 
						|
								    public function sendPubRec($packetId) {
							 | 
						|
								        if(!$this->isConnected()) return false;
							 | 
						|
								
							 | 
						|
								        $payload = chr(($packetId & 0xff00)>>8) . chr($packetId & 0xff);
							 | 
						|
								        $header = $this->createHeader(self::MQTT_PUBREC, $payload);
							 | 
						|
								        $this->debugMessage('Sending PUBREC');
							 | 
						|
								        $this->send($header . $payload);
							 | 
						|
								
							 | 
						|
								        return true;
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Send PUBREL as response to a recieved PUBREC packet (QoS Level 2)
							 | 
						|
								     *
							 | 
						|
								     * @param integer $packetId Packet identifier of PUBLISH packet
							 | 
						|
								     * @return boolean Returns true if packet sent successfully
							 | 
						|
								     */
							 | 
						|
								    public function sendPubRel($packetId) {
							 | 
						|
								        if(!$this->isConnected()) return false;
							 | 
						|
								
							 | 
						|
								        $payload = chr(($packetId & 0xff00)>>8) . chr($packetId & 0xff);
							 | 
						|
								        $header = $this->createHeader(self::MQTT_PUBREL, $payload);
							 | 
						|
								        $this->debugMessage('Sending PUBREL');
							 | 
						|
								        $this->send($header . $payload);
							 | 
						|
								
							 | 
						|
								        return true;
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Send PUBCOMP as response to a recieved PUBREL packet (QoS Level 2)
							 | 
						|
								     *
							 | 
						|
								     * @param integer $packetId Packet identifier of PUBLISH packet
							 | 
						|
								     * @return boolean Returns true if packet sent successfully
							 | 
						|
								     */
							 | 
						|
								    public function sendPubComp($packetId) {
							 | 
						|
								        if(!$this->isConnected()) return false;
							 | 
						|
								
							 | 
						|
								        $payload = chr(($packetId & 0xff00)>>8) . chr($packetId & 0xff);
							 | 
						|
								        $header = $this->createHeader(self::MQTT_PUBCOMP, $payload);
							 | 
						|
								        $this->debugMessage('Sending PUBCOMP');
							 | 
						|
								        $this->send($header . $payload);
							 | 
						|
								
							 | 
						|
								        return true;
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Subscribe to topics with a quality of service
							 | 
						|
								     *
							 | 
						|
								     * @param string[] $topics Topics to subscribe for
							 | 
						|
								     * @param integer $qos Quality of serivce for all topics
							 | 
						|
								     * @return boolean Returns true if SUBACK was recieved
							 | 
						|
								     */
							 | 
						|
								    public function sendSubscribe($topics, $qos = self::MQTT_QOS1) {
							 | 
						|
								        if (!is_array($topics)) $topics = [$topics];
							 | 
						|
								        if(!$this->isConnected()) return false;
							 | 
						|
								
							 | 
						|
								        $packetId = $this->getNextPacketId();
							 | 
						|
								        $payload = $this->getPacketIdPayload();
							 | 
						|
								        foreach($topics as $topic) {
							 | 
						|
								            $payload .= $this->createPayload($topic);
							 | 
						|
								            $payload .= chr($qos);
							 | 
						|
								        }
							 | 
						|
								        $header = $this->createHeader(self::MQTT_SUBSCRIBE + 0x02, $payload);
							 | 
						|
								        $this->debugMessage('Sending SUBSCRIBE');
							 | 
						|
								        $this->send($header . $payload);
							 | 
						|
								
							 | 
						|
								        // A SUBACK packet is expected
							 | 
						|
								        $response = $this->waitForPacket(self::MQTT_SUBACK, $packetId);
							 | 
						|
								        if($response === false) {
							 | 
						|
								            $this->debugMessage('Packet missing, expecting SUBACK');
							 | 
						|
								            return false;
							 | 
						|
								        }
							 | 
						|
								        $responsePayload = substr($response, 3);   // skip header and identifier (3 bytes)
							 | 
						|
								        if (strlen($responsePayload) != count($topics)) {
							 | 
						|
								            $this->debugMessage('Did not recieve SUBACK for all topics');
							 | 
						|
								            return false;
							 | 
						|
								        }
							 | 
						|
								
							 | 
						|
								        // Check which subscriptions that were approved
							 | 
						|
								        $topicsResult = [];
							 | 
						|
								        $i = 0;
							 | 
						|
								        foreach ($topics as $topic) {
							 | 
						|
								            $topicsResult[$topic] = [];
							 | 
						|
								            if ($responsePayload[$i] > 0x02) {
							 | 
						|
								                $topicsResult[$topic]['success'] = false;
							 | 
						|
								                $topicsResult[$topic]['qosGiven'] = null;
							 | 
						|
								            } else {
							 | 
						|
								                $topicsResult[$topic]['success'] = true;
							 | 
						|
								                $topicsResult[$topic]['qosGiven'] = (int) ord($responsePayload[$i]);
							 | 
						|
								            }
							 | 
						|
								            $i++;
							 | 
						|
								        }
							 | 
						|
								
							 | 
						|
								        return $topicsResult;
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Send unsubscribe packet for given topics
							 | 
						|
								     *
							 | 
						|
								     * @param string[] $topics
							 | 
						|
								     * @return boolean Returns true if UNSUBACK was recieved
							 | 
						|
								     */
							 | 
						|
								    public function sendUnsubscribe($topics) {
							 | 
						|
								        if(!$this->isConnected()) return false;
							 | 
						|
								
							 | 
						|
								        $packetId = $this->getNextPacketId();
							 | 
						|
								        $payload = $this->getPacketIdPayload();
							 | 
						|
								        foreach($topics as $topic) {
							 | 
						|
								            $payload .= $this->createPayload($topic);
							 | 
						|
								        }
							 | 
						|
								        $header = $this->createHeader(self::MQTT_UNSUBSCRIBE + 0x02, $payload);
							 | 
						|
								        $this->debugMessage('Sending UNSUBSCRIBE');
							 | 
						|
								        $this->send($header . $payload);
							 | 
						|
								
							 | 
						|
								        // An UNSUBACK packet is expected
							 | 
						|
								        $response = $this->waitForPacket(self::MQTT_UNSUBACK, $packetId);
							 | 
						|
								        if($response === false) {
							 | 
						|
								            $this->debugMessage('Invalid packet received, expecting UNSUBACK');
							 | 
						|
								            return false;
							 | 
						|
								        }
							 | 
						|
								        return true;
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Sends PINGREQ packet to server
							 | 
						|
								     *
							 | 
						|
								     * @return boolean Returns true if PINGRESP was recieved
							 | 
						|
								     */
							 | 
						|
								    public function sendPing() {
							 | 
						|
								        if(!$this->isConnected()) return false;
							 | 
						|
								
							 | 
						|
								        $this->timeSincePingReq = time();
							 | 
						|
								        $header = $this->createHeader(self::MQTT_PINGREQ);
							 | 
						|
								        $this->debugMessage('Sending PING');
							 | 
						|
								        $this->send($header);
							 | 
						|
								        $this->pingReqTime = time();
							 | 
						|
								
							 | 
						|
								        // A PINGRESP packet is expected
							 | 
						|
								        $response = $this->waitForPacket(self::MQTT_PINGRESP);
							 | 
						|
								        if($response === false) {
							 | 
						|
								            $this->debugMessage('Invalid packet received, expecting PINGRESP');
							 | 
						|
								            return false;
							 | 
						|
								        }
							 | 
						|
								
							 | 
						|
								        return true;
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Send disconnect and close socket
							 | 
						|
								     */
							 | 
						|
								    public function sendDisconnect() {
							 | 
						|
								        if($this->isConnected()) {
							 | 
						|
								            $header = $this->createHeader(self::MQTT_DISCONNECT);
							 | 
						|
								            $this->debugMessage('Sending DISCONNECT');
							 | 
						|
								            $this->send($header);
							 | 
						|
								            $this->close();
							 | 
						|
								        }
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Close socket
							 | 
						|
								     */
							 | 
						|
								    public function close() {
							 | 
						|
								        if($this->isConnected()) {
							 | 
						|
								            $this->debugMessage('Closing socket');
							 | 
						|
								            stream_socket_shutdown($this->socket, STREAM_SHUT_RDWR);
							 | 
						|
								            $this->socket = null;
							 | 
						|
								            $this->serverAliveTime = null;
							 | 
						|
								        }
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Check if connected to stream
							 | 
						|
								     * @return boolean
							 | 
						|
								     */
							 | 
						|
								    public function isConnected() {
							 | 
						|
								        return !empty($this->socket);
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Check if connection is alive
							 | 
						|
								     * @return boolean
							 | 
						|
								     */
							 | 
						|
								    public function isAlive() {
							 | 
						|
								        return $this->isConnected() && ($this->serverAliveTime + $this->connectKeepAlive <= time());
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Set debug mode, if true then output progress messages
							 | 
						|
								     *
							 | 
						|
								     * @param boolean $mode
							 | 
						|
								     */
							 | 
						|
								    public function setDebug($mode = true) {
							 | 
						|
								        $this->debug = $mode;
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Print message to console if debug mode is on
							 | 
						|
								     *
							 | 
						|
								     * @param string $message
							 | 
						|
								     */
							 | 
						|
								    private function debugMessage($message) {
							 | 
						|
								        if ($this->debug) {
							 | 
						|
								            echo 'MQTT: '. $message .PHP_EOL;
							 | 
						|
								        }
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Return next packet identifier to use in MQTT packet
							 | 
						|
								     * Max 2 bytes to be used, restart on 0 if end reached
							 | 
						|
								     *
							 | 
						|
								     * @return integer
							 | 
						|
								     */
							 | 
						|
								    private function getNextPacketId() {
							 | 
						|
								        return ($this->packetId = ($this->packetId + 1) & 0xffff);
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Return payload of packet id, use latest generated packet id as default
							 | 
						|
								     *
							 | 
						|
								     * @param integer $packetId
							 | 
						|
								     * @return string Two chars with apyload to add in MQTT-message
							 | 
						|
								     */
							 | 
						|
								    private function getPacketIdPayload($packetId = null) {
							 | 
						|
								        if (empty($packetId)) $packetId = $this->packetId;
							 | 
						|
								        return chr(($packetId & 0xff00)>>8) . chr($packetId & 0xff);
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Add payload length as bytes to begining of string and return
							 | 
						|
								     *
							 | 
						|
								     * @param string $payload
							 | 
						|
								     * @return string
							 | 
						|
								     */
							 | 
						|
								    private function createPayload($payload) {
							 | 
						|
								        $fullLength = strlen($payload);
							 | 
						|
								        $retval = chr($fullLength>>8).chr($fullLength&0xff).$payload;
							 | 
						|
								        return $retval;
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Decode payload using inital length (2 bytes) and return as string array
							 | 
						|
								     *
							 | 
						|
								     * @param string $payload
							 | 
						|
								     * @return string[]
							 | 
						|
								     */
							 | 
						|
								    private function decodePayload($payload) {
							 | 
						|
								        $result = [];
							 | 
						|
								        while (strlen($payload) >= 2) {
							 | 
						|
								            $length = ord($payload[0])<<8 + ord($payload[1]);
							 | 
						|
								            if (strlen($payload) <= $length + 2) {
							 | 
						|
								                $result[] = substr($payload, 2, $length);
							 | 
						|
								            }
							 | 
						|
								            $payload = substr($payload, min($length + 2, strlen($payload)));
							 | 
						|
								        }
							 | 
						|
								        return $result;
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Send data to open socket
							 | 
						|
								     *
							 | 
						|
								     * @param string $data
							 | 
						|
								     * @return boolean Only returns true if all data was sent
							 | 
						|
								     */
							 | 
						|
								    private function send($data) {
							 | 
						|
								        if ($this->socket) {
							 | 
						|
								            $result = fwrite($this->socket, $data);
							 | 
						|
								            if (($result !== false) && ($result == strlen($data))) {
							 | 
						|
								                $this->serverAliveTime = time();
							 | 
						|
								                return true;
							 | 
						|
								            }
							 | 
						|
								        }
							 | 
						|
								        return false;
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Read bytes from socket until x bytes read, eof reached or socket timeout
							 | 
						|
								     *
							 | 
						|
								     * @param int $bytes Number of bytes to read
							 | 
						|
								     * @return string Return bytes read as a string
							 | 
						|
								     */
							 | 
						|
								    private function readBytes($bytes) {
							 | 
						|
								        if (!$this->socket) return false;
							 | 
						|
								        //if ($bytes == 0) return '';
							 | 
						|
								        $bytesLeft = $bytes;
							 | 
						|
								        $result = '';
							 | 
						|
								        do {
							 | 
						|
								            // If stream at end, close down socket and exit
							 | 
						|
								            if(feof($this->socket)) {
							 | 
						|
								                $this->debugMessage('Reached EOF for stream');
							 | 
						|
								                $this->close();
							 | 
						|
								                return $result;
							 | 
						|
								            }
							 | 
						|
								            // Try to read from stream
							 | 
						|
								            $str = fread($this->socket, $bytesLeft);
							 | 
						|
								            if ($str !== false && strlen($str) > 0) {
							 | 
						|
								                $result .= $str;
							 | 
						|
								                $bytesLeft -= strlen($str);
							 | 
						|
								            }
							 | 
						|
								            if ($bytesLeft <= 0) {
							 | 
						|
								                // If all bytes read, then return them
							 | 
						|
								                $this->serverAliveTime = time();
							 | 
						|
								                return $result;
							 | 
						|
								            }
							 | 
						|
								            // Check if timeout
							 | 
						|
								            $info = stream_get_meta_data($this->socket);
							 | 
						|
								            if ($info['timed_out']) {
							 | 
						|
								                $this->debugMessage('Read timeout');
							 | 
						|
								                return false;
							 | 
						|
								            }
							 | 
						|
								            // Wait a while before trying to read again (in micro seconds)
							 | 
						|
								            usleep($this->socketReadDelay * 1000);
							 | 
						|
								        } while (true);
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Encode length to bytes to send in stream
							 | 
						|
								     *
							 | 
						|
								     * @param integer $len
							 | 
						|
								     * @return string
							 | 
						|
								     */
							 | 
						|
								    private function encodeLength($len) {
							 | 
						|
								        if ($len < 0 || $len >= 128*128*128*128) {
							 | 
						|
								            // illegal length
							 | 
						|
								            return false;
							 | 
						|
								        }
							 | 
						|
								        $output = '';
							 | 
						|
								        do {
							 | 
						|
								            $byte = $len & 0x7f;  // keep lowest 7 bits
							 | 
						|
								            $len = $len >> 7;     // shift away lowest 7 bits
							 | 
						|
								            if ($len > 0) {
							 | 
						|
								                $byte = $byte | 0x80; // set high bit to indicate continuation
							 | 
						|
								            }
							 | 
						|
								            $output .= chr($byte);
							 | 
						|
								        } while ($len > 0);
							 | 
						|
								        return $output;
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Return length of packet by reading from stream
							 | 
						|
								     *
							 | 
						|
								     * @return integer
							 | 
						|
								     */
							 | 
						|
								    private function readPacketLength() {
							 | 
						|
								        $bytesRead = 0;
							 | 
						|
								        $len = 0;
							 | 
						|
								        $multiplier = 1;
							 | 
						|
								        do {
							 | 
						|
								            if ($bytesRead > 4) {
							 | 
						|
								                return false; // Malformed length
							 | 
						|
								            }
							 | 
						|
								            $str = $this->readBytes(1);
							 | 
						|
								            if ($str === false || strlen($str) != 1) {
							 | 
						|
								                return false; // Unexpected end of stream
							 | 
						|
								            }
							 | 
						|
								            $byte = ord($str[0]);
							 | 
						|
								            $len += ($byte & 0x7f) * $multiplier;
							 | 
						|
								            $isContinued = ($byte & 0x80);
							 | 
						|
								            if ($isContinued) {
							 | 
						|
								                $multiplier *= 128;
							 | 
						|
								            }
							 | 
						|
								            $bytesRead++;
							 | 
						|
								        } while ($isContinued);
							 | 
						|
								        return $len;
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Create MQTT header from command and payload
							 | 
						|
								     *
							 | 
						|
								     * @param int $command Command to send
							 | 
						|
								     * @param string $payload Payload to be sent
							 | 
						|
								     *
							 | 
						|
								     * @return string Header to send
							 | 
						|
								     */
							 | 
						|
								    private function createHeader($command, $payload = '') {
							 | 
						|
								        return chr($command) . $this->encodeLength(strlen($payload));
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Read next packet from stream
							 | 
						|
								     *
							 | 
						|
								     * @return boolean
							 | 
						|
								     */
							 | 
						|
								    private function readNextPacket() {
							 | 
						|
								        do {
							 | 
						|
								            $header = $this->readBytes(1);
							 | 
						|
								            if ($header === false) {
							 | 
						|
								                $this->lastReadStatus = self::READ_STATUS_ERROR_HEADER;
							 | 
						|
								                return false;
							 | 
						|
								            }
							 | 
						|
								        } while ((ord($header)&0xf0) == 0);     // 0 is illegal control code to start with
							 | 
						|
								
							 | 
						|
								        $packetLength = $this->readPacketLength();
							 | 
						|
								        if ($packetLength === false) {
							 | 
						|
								            $this->debugMessage('Could not decode packet length');
							 | 
						|
								            $this->lastReadStatus = self::READ_STATUS_ERROR_PACKETLENGTH;
							 | 
						|
								            return false;
							 | 
						|
								        }
							 | 
						|
								
							 | 
						|
								        $payload = $packetLength > 0 ? $this->readBytes($packetLength) : '';
							 | 
						|
								        if ($payload === false) {
							 | 
						|
								            $this->lastReadStatus = self::READ_STATUS_ERROR_PAYLOAD;
							 | 
						|
								            return false;
							 | 
						|
								        }
							 | 
						|
								        $this->debugMessage('Packet response: '. self::str2hex($header . $payload));
							 | 
						|
								        $this->lastReadStatus = self::READ_STATUS_OK;
							 | 
						|
								        return $header . $payload;
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    public function getLastReadStatus() {
							 | 
						|
								        return $this->lastReadStatus;
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    public function hasMoreToRead() {
							 | 
						|
								        return ($this->lastReadStatus == self::READ_STATUS_OK) && $this->isConnected();
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Read packets from stream and save to queue. Quit after x packets or timeout.
							 | 
						|
								     *
							 | 
						|
								     * @param integer $maxPackets Packet id the message must match
							 | 
						|
								     * @return integer Number of packets read
							 | 
						|
								     */
							 | 
						|
								    private function readPackets($maxPackets = 100) {
							 | 
						|
								        $receivedPackets = 0;
							 | 
						|
								        while (($receivedPackets < $maxPackets) && ($packet = $this->readNextPacket()) !== false) {
							 | 
						|
								            $this->packetQueue[] = $packet;
							 | 
						|
								            $receivedPackets++;
							 | 
						|
								        }
							 | 
						|
								        return $receivedPackets;
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Wait until a certain packet is found in the stream.
							 | 
						|
								     * Save other recieved packets in queue.
							 | 
						|
								     *
							 | 
						|
								     * @param byte $header Header to look for (only 4 high bits) 0xf0
							 | 
						|
								     * @param integer $verifyPacketId Packet id the message must match
							 | 
						|
								     * @return boolean
							 | 
						|
								     */
							 | 
						|
								    private function waitForPacket($header, $verifyPacketId = false) {
							 | 
						|
								        // first check unhandled packets
							 | 
						|
								        foreach ($this->packetQueue as $key => $packet) {
							 | 
						|
								            if ($this->isPacketVerified($packet, $header, $verifyPacketId)) {
							 | 
						|
								                // if found, remove from queue and return packet
							 | 
						|
								                unset($this->packetQueue[$key]);
							 | 
						|
								                return $packet;
							 | 
						|
								            }
							 | 
						|
								        }
							 | 
						|
								        // if not found in queue, start reading from stream until found or timeout
							 | 
						|
								        do {
							 | 
						|
								            $packet = $this->readNextPacket();
							 | 
						|
								            if ($packet === false || empty($packet)) return false;
							 | 
						|
								            if ($this->isPacketVerified($packet, $header, $verifyPacketId)) {
							 | 
						|
								                return $packet;
							 | 
						|
								            }
							 | 
						|
								            // another packet found, save it to queue
							 | 
						|
								            $this->packetQueue[] = $packet;
							 | 
						|
								        } while(true);
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Check if packet is of a given type and packet id match latest sent packet id
							 | 
						|
								     *
							 | 
						|
								     * @param string $packet
							 | 
						|
								     * @param char $header
							 | 
						|
								     * @param integer $verifyPacketId
							 | 
						|
								     * @return boolean
							 | 
						|
								     */
							 | 
						|
								    private function isPacketVerified($packet, $header, $verifyPacketId = false) {
							 | 
						|
								        if (is_string($packet) && strlen($packet) >= 1) {
							 | 
						|
								            if ((int)(ord($packet[0])&0xf0) == (int)($header&0xf0)) {
							 | 
						|
								                if ($verifyPacketId === false) return true;
							 | 
						|
								                if (strlen($packet) >= 3) {
							 | 
						|
								                    $receivedPacketId = (int)(ord($packet[1])<<8) + ord($packet[2]);
							 | 
						|
								                    if($verifyPacketId == $receivedPacketId) {
							 | 
						|
								                        return true;
							 | 
						|
								                    }
							 | 
						|
								                }
							 | 
						|
								            }
							 | 
						|
								        }
							 | 
						|
								        return false;
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Get packets matching a header from the queue and remove from queue
							 | 
						|
								     *
							 | 
						|
								     * @param char $header
							 | 
						|
								     * @return string[]
							 | 
						|
								     */
							 | 
						|
								    public function getQueuePackets($header) {
							 | 
						|
								        $foundPackets = [];
							 | 
						|
								        foreach ($this->packetQueue as $key => $packet) {
							 | 
						|
								            if ($this->isPacketVerified($packet, $header)) {
							 | 
						|
								                $foundPackets[] = $packet;
							 | 
						|
								                unset($this->packetQueue[$key]);
							 | 
						|
								            }
							 | 
						|
								        }
							 | 
						|
								        return $foundPackets;
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Get PUBLISH packets and return them as messages
							 | 
						|
								     *
							 | 
						|
								     * @param integer $maxMessages Max messages to read
							 | 
						|
								     * @param boolean $sendPubAck If true, then send PUBACK to MQTT-server (QoS 1)
							 | 
						|
								     * @param boolean $sendPubRec If true, then send PUBREC to MQTT-server, wait for PUBREL and send PUBCOMP (QoS 2)
							 | 
						|
								     * @return string[] All PUBLISH messages which were confirmed or no confirmation needed/wanted
							 | 
						|
								     */
							 | 
						|
								    public function getPublishMessages($maxMessages = 100, $sendPubAck = false, $sendPubRec = false) {
							 | 
						|
								        $packetsRead = $this->readPackets($maxMessages);
							 | 
						|
								        $packets = $this->getQueuePackets(self::MQTT_PUBLISH);
							 | 
						|
								        $messages = [];
							 | 
						|
								        foreach ($packets as $key => $packet) {
							 | 
						|
								            $message = $this->decodePublish($packet);
							 | 
						|
								            if ($message === false) {
							 | 
						|
								                $this->debugMessage('Message could not be decoded');
							 | 
						|
								                continue;
							 | 
						|
								            }
							 | 
						|
								
							 | 
						|
								            if ($sendPubAck && ($message['qos'] == self::MQTT_QOS1)) {
							 | 
						|
								                if($this->sendPubAck($message['packetId']) === false) {
							 | 
						|
								                    $this->debugMessage('Failed to send PUBACK');
							 | 
						|
								                    continue;
							 | 
						|
								                }
							 | 
						|
								            } elseif ($sendPubRec && ($message['qos'] == self::MQTT_QOS2)) {
							 | 
						|
								                // Send PUBREC
							 | 
						|
								                if($this->sendPubRec($message['packetId']) === false) {
							 | 
						|
								                    $this->debugMessage('Failed to send PUBREC');
							 | 
						|
								                    continue;
							 | 
						|
								                }
							 | 
						|
								                // A PUBREL packet is expected
							 | 
						|
								                $response = $this->waitForPacket(self::MQTT_PUBREL, $message['packetId']);
							 | 
						|
								                if($response === false) {
							 | 
						|
								                    $this->debugMessage('Packet missing, expecting PUBREL');
							 | 
						|
								                    continue;
							 | 
						|
								                }
							 | 
						|
								                // Send PUBCOMP
							 | 
						|
								                if($this->sendPubComp($message['packetId']) === false) {
							 | 
						|
								                    $this->debugMessage('Failed to send PUBCOMP');
							 | 
						|
								                    continue;
							 | 
						|
								                }
							 | 
						|
								            }
							 | 
						|
								
							 | 
						|
								            // Package was successfully confirmed or no confirmation needed/wanted --> store it
							 | 
						|
								            $messages[] = $message;
							 | 
						|
								        }
							 | 
						|
								        return $messages;
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Decode a publish packet to its attributes
							 | 
						|
								     *
							 | 
						|
								     * @param string $packet
							 | 
						|
								     * @return array|boolean Return message or false if decode failed
							 | 
						|
								     */
							 | 
						|
								    public function decodePublish($packet) {
							 | 
						|
								        if (!is_string($packet) || (strlen($packet) <= 3)) {
							 | 
						|
								            return false;
							 | 
						|
								        }
							 | 
						|
								        $flags = ord($packet[0]) & 0x0f;
							 | 
						|
								        $duplicate = ($flags == 0x80);
							 | 
						|
								        $retain = ($flags == 0x01);
							 | 
						|
								        $qos = ($flags>>1) & 0x03;
							 | 
						|
								        $topicLength = (ord($packet[1])<<8) + ord($packet[2]);
							 | 
						|
								        $topic = substr($packet, 3, $topicLength);
							 | 
						|
								
							 | 
						|
								        $payload = substr($packet, 3 + $topicLength);   // Get the payload of the packet
							 | 
						|
								        if ($qos == 0) {
							 | 
						|
								            // no packet id for QoS 0, the payload is the message
							 | 
						|
								            $message = $payload;
							 | 
						|
								            $packetId = NULL;
							 | 
						|
								        } else {
							 | 
						|
								            if (strlen($payload) >= 2) {
							 | 
						|
								                $packetId = (ord($payload[0])<<8) + ord($payload[1]);
							 | 
						|
								                $message = substr($payload, 2);   // skip packet id (2 bytes) for QoS 1 and 2
							 | 
						|
								            } else {
							 | 
						|
								                // 2 byte packet id required, but not found. exit gracefully (no failure)
							 | 
						|
								                $packetId = NULL;
							 | 
						|
								                $message = '';
							 | 
						|
								            }
							 | 
						|
								        }
							 | 
						|
								        return [
							 | 
						|
								            'topic' => self::convertActiveMqTopic($topic),
							 | 
						|
								            'message' => $message,
							 | 
						|
								            'retain' => $retain,
							 | 
						|
								            'duplicate' => $duplicate,
							 | 
						|
								            'qos' => $qos,
							 | 
						|
								            'packetId' => $packetId,
							 | 
						|
								        ];
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Replace ActiveMQ special characters to MQTT-standard
							 | 
						|
								     *
							 | 
						|
								     * @param string $topic
							 | 
						|
								     * @return string
							 | 
						|
								     */
							 | 
						|
								    private static function convertActiveMqTopic($topic) {
							 | 
						|
								        $topic = str_replace(".","/", $topic);
							 | 
						|
								        $topic = str_replace("*","+", $topic);
							 | 
						|
								        $topic = str_replace(">","#", $topic);
							 | 
						|
								        return $topic;
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /**
							 | 
						|
								     * Return a string interpreted as hex and ASCII (between 0x20-0x7f)
							 | 
						|
								     * Good for displaying recieved packets
							 | 
						|
								     *
							 | 
						|
								     * @param string $str
							 | 
						|
								     * @return string
							 | 
						|
								     */
							 | 
						|
								    private function str2hex($str) {
							 | 
						|
								        $hex = '';
							 | 
						|
								        $ascii = '';
							 | 
						|
								        for ($i=0; $i<strlen($str); $i++) {
							 | 
						|
								            $char = $str[$i];
							 | 
						|
								            if (ord($char) >= 0x20 && ord($char) <= 0x7f) {
							 | 
						|
								                $ascii .= $char;
							 | 
						|
								            } else {
							 | 
						|
								                $ascii .= '.';
							 | 
						|
								            }
							 | 
						|
								            $hex .= dechex(ord($char)).' ';
							 | 
						|
								        }
							 | 
						|
								        return $hex . '"'. $ascii .'"';
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    public function dumpQueue() {
							 | 
						|
								        foreach ($this->packetQueue as $packet) {
							 | 
						|
								            $this->str2hex($packet) . PHP_EOL;
							 | 
						|
								        }
							 | 
						|
								    }
							 | 
						|
								}
							 |