You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1098 lines
37 KiB
1098 lines
37 KiB
<?php
|
|
declare(strict_types=1);
|
|
|
|
namespace App\Libs;
|
|
|
|
class MQTTClient {
|
|
|
|
// MQTT control packet types (here left shifted 4 bits)
|
|
const MQTT_CONNECT = 0x10; // Client request to connect to Server
|
|
const MQTT_CONNACK = 0x20; // Connect acknowledgment
|
|
const MQTT_PUBLISH = 0x30; // Publish message
|
|
const MQTT_PUBACK = 0x40; // Publish acknowledgment
|
|
const MQTT_PUBREC = 0x50; // Publish received (assured delivery part 1)
|
|
const MQTT_PUBREL = 0x62; // Publish release (assured delivery part 2)
|
|
const MQTT_PUBCOMP = 0x70; // Publish complete (assured delivery part 3)
|
|
const MQTT_SUBSCRIBE = 0x80; // Client subscribe request
|
|
const MQTT_SUBACK = 0x90; // Subscribe acknowledgment
|
|
const MQTT_UNSUBSCRIBE = 0xa0; // Unsubscribe request
|
|
const MQTT_UNSUBACK = 0xb0; // Unsubscribe acknowledgment
|
|
const MQTT_PINGREQ = 0xc0; // PING request
|
|
const MQTT_PINGRESP = 0xd0; // PING response
|
|
const MQTT_DISCONNECT = 0xe0; // Client is disconnecting
|
|
|
|
// MQTT quality of service levels
|
|
const MQTT_QOS0 = 0x00;
|
|
const MQTT_QOS1 = 0x01;
|
|
const MQTT_QOS2 = 0x02;
|
|
|
|
// MQTT status on last read from stream
|
|
const READ_STATUS_NO_READ = 0;
|
|
const READ_STATUS_OK = 200;
|
|
const READ_STATUS_ERROR = 400;
|
|
const READ_STATUS_ERROR_HEADER = 401;
|
|
const READ_STATUS_ERROR_PACKETLENGTH = 402;
|
|
const READ_STATUS_ERROR_PAYLOAD = 403;
|
|
|
|
private $socket = null; // Socket resource reference
|
|
private $socketTimeout; // Default socket timeout in milliseconds
|
|
private $socketReadDelay = 1000; // Delay in milliseconds between read attempts on socket
|
|
|
|
private $protocol = 'tcp';
|
|
private $serverAddress;
|
|
private $serverPort;
|
|
private $clientId;
|
|
|
|
public $caFile = null;
|
|
public $localCert = null;
|
|
public $localPrivateKey = null;
|
|
|
|
private $connectCleanSession;
|
|
private $connectKeepAlive;
|
|
private $connectWill = false;
|
|
private $connectWillQos;
|
|
private $connectWillRetain;
|
|
private $connectUsername;
|
|
private $connectPassword;
|
|
|
|
private $willTopic;
|
|
private $willMessage;
|
|
|
|
private $packetId; // Packet identifier to validate return packets
|
|
private $pingReqTime; // Time when last PINGREQ was sent
|
|
private $serverAliveTime; // Time for last response from server
|
|
|
|
private $debug = false;
|
|
private $lastReadStatus = self::READ_STATUS_NO_READ;
|
|
|
|
private $packetQueue = []; // Queue for received but unhandled packages
|
|
|
|
public $lastConnectResult = 0;
|
|
|
|
/**
|
|
* Class constructor - Sets up connection parameters
|
|
*
|
|
* @param string $address Address to server
|
|
* @param string $port Port to server
|
|
* @param string $protocol Which protocol to use
|
|
*/
|
|
function __construct($address, $port=null, $protocol='tcp'){
|
|
if ($this->setConnection($address, $port, $protocol)) {
|
|
;
|
|
}
|
|
$this->packetId = rand(1,100)*100; // Reduce risk of creating duplicate ids in sequential sessions
|
|
}
|
|
|
|
/**
|
|
* Class destructor - Close socket
|
|
*/
|
|
function __destruct(){
|
|
$this->close();
|
|
}
|
|
|
|
/**
|
|
* Setup conection parameters
|
|
*
|
|
* @param string $address
|
|
* @param string $port
|
|
* @param string $protocol
|
|
* @return boolean If return false then using default parameters where validation failed
|
|
*/
|
|
function setConnection($address, $port=null, $protocol='tcp'){
|
|
$this->serverAddress = $address;
|
|
$this->serverPort = $port;
|
|
|
|
// Validate protocol
|
|
$protocol = strtolower($protocol);
|
|
if (($protocol != 'tcp') && !self::isEncryptionProtocol($protocol)) {
|
|
$this->debugMessage('Invalid protocol ('.$protocol.'). Setting to default (tcp).');
|
|
$this->protocol = 'tcp';
|
|
return false;
|
|
}
|
|
$this->protocol = $protocol;
|
|
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Build url for connecting to stream
|
|
*
|
|
* @return string
|
|
*/
|
|
private function getUrl() {
|
|
$url = '';
|
|
if ($this->protocol) $url .= $this->protocol .'://';
|
|
$url .= $this->serverAddress;
|
|
if ($this->serverPort) $url .= ':'. $this->serverPort;
|
|
return $url;
|
|
}
|
|
|
|
/**
|
|
* Check if encryption protocol is supported
|
|
*
|
|
* @param string $protcol
|
|
* @return boolean
|
|
*/
|
|
private static function isEncryptionProtocol($protocol) {
|
|
return in_array(strtolower($protocol), ['ssl', 'tls', 'tlsv1.0', 'tlsv1.1', 'tlsv1.2', 'sslv3']);
|
|
}
|
|
|
|
/**
|
|
* Sets server certificate and protocol for ssl/tls encryption
|
|
*
|
|
* @param string $caFile CA file to identify server
|
|
* @param string $protocl Crypto protocol (See http://php.net/manual/en/migration56.openssl.php)
|
|
* @return boolean False if settings failed, else true
|
|
*/
|
|
public function setEncryption($caFile, $protocol = null) {
|
|
if (file_exists($caFile)) {
|
|
$this->caFile = $caFile;
|
|
} else {
|
|
$this->debugMessage('CA file not found');
|
|
return false;
|
|
}
|
|
if(self::isEncryptionProtocol($protocol)) {
|
|
$this->protocol = $protocol;
|
|
} else if (!is_null($protocol)) {
|
|
$this->debugMessage('Unknown encryption protocol');
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Sets client crt and key files for client-side authentication
|
|
*
|
|
* @param string $crtFile Client certificate file
|
|
* @param string $keyFile Client key file
|
|
* @return boolean False if settings failed, else true
|
|
*/
|
|
public function setClientEncryption($certificateFile, $keyFile) {
|
|
if (!file_exists($certificateFile)) {
|
|
$this->debugMessage('Client certificate file not found');
|
|
return false;
|
|
}
|
|
if (!file_exists($keyFile)) {
|
|
$this->debugMessage('Client key file not found');
|
|
return false;
|
|
}
|
|
$this->localCert= $certificateFile;
|
|
$this->localPrivateKey = $keyFile;
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Set authentication details to be used when connecting
|
|
*
|
|
* @param string $username Username
|
|
* @param string $password Password
|
|
*/
|
|
public function setAuthentication($username, $password) {
|
|
$this->connectUsername= $username;
|
|
$this->connectPassword = $password;
|
|
}
|
|
|
|
/**
|
|
* Set will (last message defined by MQTT) to send when connection is lost
|
|
*
|
|
* @param string $topic
|
|
* @param string $message
|
|
* @param integer $qos
|
|
* @param boolean $retain
|
|
*/
|
|
public function setWill($topic, $message, $qos=1, $retain=false) {
|
|
$this->connectWill = true;
|
|
$this->connectWillQos = $qos;
|
|
$this->connectWillRetain = $retain;
|
|
$this->willTopic = $topic;
|
|
$this->willMessage = $message;
|
|
}
|
|
|
|
/**
|
|
* Connect to MQTT server
|
|
*
|
|
* @param string $clientId Unique id used by the server to identify the client
|
|
* @param boolean $cleanSession Set true to clear session on server, ie queued messages are purged (not recieved)
|
|
* @param integer $keepAlive Number of seconds a connection is considered to be alive without traffic
|
|
* @param integer $timeout Number of millliseconds before timeout when reading from socket
|
|
* @return boolean Returns false if connection failed
|
|
*/
|
|
public function sendConnect($clientId, $cleanSession=false, $keepAlive=10, $timeout=5000) {
|
|
if (!$this->serverAddress) return false;
|
|
|
|
// Basic validation of clientid
|
|
// Note: A MQTT server may accept other chars and more than 23 chars in the clientid but that is optional,
|
|
// all chars below up to 23 chars are required to be accepted (see section "3.1.3.1 Client Identifier" of the standard)
|
|
if(preg_match("/[^0-9a-zA-Z]/",$clientId)) {
|
|
$this->debugMessage('ClientId can only contain characters 0-9,a-z,A-Z');
|
|
return false;
|
|
}
|
|
if(strlen($clientId) > 23) {
|
|
$this->debugMessage('ClientId max length is 23 characters/numbers');
|
|
return false;
|
|
}
|
|
$this->clientId = $clientId;
|
|
|
|
$this->connectCleanSession = $cleanSession;
|
|
$this->connectKeepAlive = $keepAlive;
|
|
$this->socketTimeout = $timeout;
|
|
|
|
// Setup certificates if encryption protocol selected
|
|
if ($this->isEncryptionProtocol($this->protocol)) {
|
|
$mozillaCiphers = implode(':', array(
|
|
'ECDHE-RSA-AES128-GCM-SHA256',
|
|
'ECDHE-ECDSA-AES128-GCM-SHA256',
|
|
'ECDHE-RSA-AES256-GCM-SHA384',
|
|
'ECDHE-ECDSA-AES256-GCM-SHA384',
|
|
'DHE-RSA-AES128-GCM-SHA256',
|
|
'DHE-DSS-AES128-GCM-SHA256',
|
|
'kEDH+AESGCM',
|
|
'ECDHE-RSA-AES128-SHA256',
|
|
'ECDHE-ECDSA-AES128-SHA256',
|
|
'ECDHE-RSA-AES128-SHA',
|
|
'ECDHE-ECDSA-AES128-SHA',
|
|
'ECDHE-RSA-AES256-SHA384',
|
|
'ECDHE-ECDSA-AES256-SHA384',
|
|
'ECDHE-RSA-AES256-SHA',
|
|
'ECDHE-ECDSA-AES256-SHA',
|
|
'DHE-RSA-AES128-SHA256',
|
|
'DHE-RSA-AES128-SHA',
|
|
'DHE-DSS-AES128-SHA256',
|
|
'DHE-RSA-AES256-SHA256',
|
|
'DHE-DSS-AES256-SHA',
|
|
'DHE-RSA-AES256-SHA',
|
|
'AES128-GCM-SHA256',
|
|
'AES256-GCM-SHA384',
|
|
'ECDHE-RSA-RC4-SHA',
|
|
'ECDHE-ECDSA-RC4-SHA',
|
|
'AES128',
|
|
'AES256',
|
|
'RC4-SHA',
|
|
'HIGH',
|
|
'!aNULL',
|
|
'!eNULL',
|
|
'!EXPORT',
|
|
'!DES',
|
|
'!3DES',
|
|
'!MD5',
|
|
'!PSK'
|
|
));
|
|
// Secure socket communication with these parameters, a ca-file is required
|
|
$options = [];
|
|
$options['verify_peer'] = true;
|
|
$options['verify_peer_name'] = true;
|
|
$options['verify_depth'] = 5;
|
|
$options['disable_compression'] = true;
|
|
$options['SNI_enabled'] = true;
|
|
$options['ciphers'] = $mozillaCiphers;
|
|
if($this->caFile) {
|
|
$options['cafile'] = $this->caFile;
|
|
}
|
|
if($this->localCert) {
|
|
$options['local_cert'] = $this->localCert;
|
|
if ($this->localPrivateKey) {
|
|
$options['local_pk'] = $this->localPrivateKey;
|
|
}
|
|
}
|
|
$socketContext = stream_context_create(['ssl' => $options]);
|
|
$this->debugMessage('Settings socket options: '. var_export($options, true));
|
|
} else {
|
|
$socketContext = null;
|
|
}
|
|
|
|
// Try to open socket
|
|
try {
|
|
$this->debugMessage('Opening socket to: '. $this->getUrl());
|
|
if ($socketContext) {
|
|
$this->socket = stream_socket_client($this->getUrl(), $errno, $errstr, 10, STREAM_CLIENT_CONNECT, $socketContext);
|
|
} else {
|
|
$this->socket = stream_socket_client($this->getUrl(), $errno, $errstr, 10, STREAM_CLIENT_CONNECT);
|
|
}
|
|
} catch (\ErrorException $error) {
|
|
$this->debugMessage('Exception: Could not open stream with error message: '. $error->getMessage());
|
|
$this->socket = null;
|
|
return false;
|
|
}
|
|
|
|
// Check if socket was opened successfully
|
|
if ($this->socket === false) {
|
|
$this->socket = null;
|
|
$this->debugMessage('Connection failed. Error-no:'. $errno .' Error message: '. $errstr);
|
|
return false;
|
|
}
|
|
|
|
// Set socket timeout
|
|
ini_set('default_socket_timeout', 10);
|
|
stream_set_timeout($this->socket, 0, $this->socketTimeout * 1000);
|
|
// Set stream to non-blocking mode, ie do not wait to read if stream is empty
|
|
stream_set_blocking($this->socket, true);
|
|
|
|
// Calculate connect flags to use in CONNECT header
|
|
$connectFlags = 0;
|
|
if ($this->connectCleanSession) $connectFlags += 0x02;
|
|
if ($this->connectWill) {
|
|
$connectFlags += 0x04;
|
|
if ($this->connectWillQos) $connectFlags += ($this->connectWill << 3);
|
|
if ($this->connectWillRetain) $connectFlags += 0x20;
|
|
}
|
|
if ($this->connectUsername) {
|
|
$connectFlags += 0x80;
|
|
if ($this->connectPassword) $connectFlags += 0x40;
|
|
}
|
|
|
|
// Build payload and header for CONNECT-packet
|
|
$payload = chr(0x00).chr(0x04); // MSB & LSB length of MQTT = 4
|
|
$payload .= 'MQTT';
|
|
$payload .= chr(0x04); // Protocol level (3.1.1)
|
|
$payload .= chr($connectFlags); // Connect flags
|
|
$payload .= chr($this->connectKeepAlive >> 8); // Keepalive (MSB)
|
|
$payload .= chr($this->connectKeepAlive & 0xff); // Keepalive (LSB)
|
|
if ($this->connectCleanSession && empty($this->clientId)) {
|
|
$this->clientId = rand(1,999999999);
|
|
}
|
|
if ($this->clientId) {
|
|
$payload .= $this->createPayload($this->clientId);
|
|
}
|
|
if($this->connectWill){
|
|
$payload .= $this->createPayload($this->willTopic);
|
|
$payload .= $this->createPayload($this->willMessage);
|
|
}
|
|
if($this->connectUsername) {
|
|
$payload .= $this->createPayload($this->connectUsername);
|
|
}
|
|
if ($this->connectPassword) {
|
|
$payload .= $this->createPayload($this->connectPassword);
|
|
}
|
|
$header = $this->createHeader(self::MQTT_CONNECT, $payload);
|
|
$this->debugMessage('Sending CONNECT');
|
|
$this->send($header . $payload);
|
|
|
|
// Wait for CONNACK packet
|
|
$response = $this->waitForPacket(self::MQTT_CONNACK);
|
|
if($response !== false && ($response[2] == chr(0))) {
|
|
$this->debugMessage('Connected to MQTT');
|
|
$this->lastConnectResult = 0;
|
|
return true;
|
|
} else {
|
|
$this->debugMessage('Connection failed! Error: '. ord($response[2]));
|
|
$this->lastConnectResult = ord($response[2]);
|
|
$this->close();
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Publish a topic and message (QoS 0,1,2 supported)
|
|
*
|
|
* @param string $topic
|
|
* @param string $message
|
|
* @param byte $qos
|
|
* @return boolean
|
|
*/
|
|
public function sendPublish($topic, $message, $qos = self::MQTT_QOS1, $retain = 0) {
|
|
if(!$this->isConnected()) return false;
|
|
|
|
if($qos!=self::MQTT_QOS0 && $qos!=self::MQTT_QOS1 && $qos!=self::MQTT_QOS2) return false;
|
|
|
|
$packetId = $this->getNextPacketId();
|
|
$payload = $this->createPayload($topic);
|
|
if($qos >= self::MQTT_QOS1) {
|
|
// Packet identifier required for QoS level >= 1
|
|
$payload .= $this->getPacketIdPayload();
|
|
}
|
|
$payload .= $message;
|
|
|
|
$dupFlag = 0;
|
|
$header = $this->createHeader(self::MQTT_PUBLISH + ($dupFlag<<3) + ($qos<<1) + $retain, $payload);
|
|
$this->debugMessage('Sending PUBLISH');
|
|
$this->send($header . $payload);
|
|
|
|
if($qos == self::MQTT_QOS1) {
|
|
// If QoS level 1, only a PUBACK packet is expected
|
|
$response = $this->waitForPacket(self::MQTT_PUBACK, $packetId);
|
|
if($response === false) {
|
|
$this->debugMessage('Packet missing, expecting PUBACK');
|
|
return false;
|
|
}
|
|
} elseif($qos == self::MQTT_QOS2) {
|
|
// If QoS level 2, a PUBREC packet is expected
|
|
$response = $this->waitForPacket(self::MQTT_PUBREC, $packetId);
|
|
if($response === false) {
|
|
$this->debugMessage('Packet missing, expecting PUBREC');
|
|
return false;
|
|
}
|
|
|
|
// Send PUBREL
|
|
$response = $this->sendPubRel($packetId);
|
|
if($response === false) {
|
|
$this->debugMessage('Failed to send PUBREL');
|
|
return false;
|
|
}
|
|
|
|
// A PUBCOMP packet is expected
|
|
$response = $this->waitForPacket(self::MQTT_PUBCOMP, $packetId);
|
|
if($response === false) {
|
|
$this->debugMessage('Packet missing, expecting PUBCOMP');
|
|
return false;
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Send PUBACK as response to a recieved PUBLISH packet (QoS Level 1)
|
|
*
|
|
* @param integer $packetId Packet identifier of PUBLISH packet
|
|
* @return boolean Returns true if packet sent successfully
|
|
*/
|
|
public function sendPubAck($packetId) {
|
|
if(!$this->isConnected()) return false;
|
|
|
|
$payload = chr(($packetId & 0xff00)>>8) . chr($packetId & 0xff);
|
|
$header = $this->createHeader(self::MQTT_PUBACK, $payload);
|
|
$this->debugMessage('Sending PUBACK');
|
|
$this->send($header . $payload);
|
|
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Send PUBREC as response to a recieved PUBLISH packet (QoS Level 2)
|
|
*
|
|
* @param integer $packetId Packet identifier of PUBLISH packet
|
|
* @return boolean Returns true if packet sent successfully
|
|
*/
|
|
public function sendPubRec($packetId) {
|
|
if(!$this->isConnected()) return false;
|
|
|
|
$payload = chr(($packetId & 0xff00)>>8) . chr($packetId & 0xff);
|
|
$header = $this->createHeader(self::MQTT_PUBREC, $payload);
|
|
$this->debugMessage('Sending PUBREC');
|
|
$this->send($header . $payload);
|
|
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Send PUBREL as response to a recieved PUBREC packet (QoS Level 2)
|
|
*
|
|
* @param integer $packetId Packet identifier of PUBLISH packet
|
|
* @return boolean Returns true if packet sent successfully
|
|
*/
|
|
public function sendPubRel($packetId) {
|
|
if(!$this->isConnected()) return false;
|
|
|
|
$payload = chr(($packetId & 0xff00)>>8) . chr($packetId & 0xff);
|
|
$header = $this->createHeader(self::MQTT_PUBREL, $payload);
|
|
$this->debugMessage('Sending PUBREL');
|
|
$this->send($header . $payload);
|
|
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Send PUBCOMP as response to a recieved PUBREL packet (QoS Level 2)
|
|
*
|
|
* @param integer $packetId Packet identifier of PUBLISH packet
|
|
* @return boolean Returns true if packet sent successfully
|
|
*/
|
|
public function sendPubComp($packetId) {
|
|
if(!$this->isConnected()) return false;
|
|
|
|
$payload = chr(($packetId & 0xff00)>>8) . chr($packetId & 0xff);
|
|
$header = $this->createHeader(self::MQTT_PUBCOMP, $payload);
|
|
$this->debugMessage('Sending PUBCOMP');
|
|
$this->send($header . $payload);
|
|
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Subscribe to topics with a quality of service
|
|
*
|
|
* @param string[] $topics Topics to subscribe for
|
|
* @param integer $qos Quality of serivce for all topics
|
|
* @return boolean Returns true if SUBACK was recieved
|
|
*/
|
|
public function sendSubscribe($topics, $qos = self::MQTT_QOS1) {
|
|
if (!is_array($topics)) $topics = [$topics];
|
|
if(!$this->isConnected()) return false;
|
|
|
|
$packetId = $this->getNextPacketId();
|
|
$payload = $this->getPacketIdPayload();
|
|
foreach($topics as $topic) {
|
|
$payload .= $this->createPayload($topic);
|
|
$payload .= chr($qos);
|
|
}
|
|
$header = $this->createHeader(self::MQTT_SUBSCRIBE + 0x02, $payload);
|
|
$this->debugMessage('Sending SUBSCRIBE');
|
|
$this->send($header . $payload);
|
|
|
|
// A SUBACK packet is expected
|
|
$response = $this->waitForPacket(self::MQTT_SUBACK, $packetId);
|
|
if($response === false) {
|
|
$this->debugMessage('Packet missing, expecting SUBACK');
|
|
return false;
|
|
}
|
|
$responsePayload = substr($response, 3); // skip header and identifier (3 bytes)
|
|
if (strlen($responsePayload) != count($topics)) {
|
|
$this->debugMessage('Did not recieve SUBACK for all topics');
|
|
return false;
|
|
}
|
|
|
|
// Check which subscriptions that were approved
|
|
$topicsResult = [];
|
|
$i = 0;
|
|
foreach ($topics as $topic) {
|
|
$topicsResult[$topic] = [];
|
|
if ($responsePayload[$i] > 0x02) {
|
|
$topicsResult[$topic]['success'] = false;
|
|
$topicsResult[$topic]['qosGiven'] = null;
|
|
} else {
|
|
$topicsResult[$topic]['success'] = true;
|
|
$topicsResult[$topic]['qosGiven'] = (int) ord($responsePayload[$i]);
|
|
}
|
|
$i++;
|
|
}
|
|
|
|
return $topicsResult;
|
|
}
|
|
|
|
/**
|
|
* Send unsubscribe packet for given topics
|
|
*
|
|
* @param string[] $topics
|
|
* @return boolean Returns true if UNSUBACK was recieved
|
|
*/
|
|
public function sendUnsubscribe($topics) {
|
|
if(!$this->isConnected()) return false;
|
|
|
|
$packetId = $this->getNextPacketId();
|
|
$payload = $this->getPacketIdPayload();
|
|
foreach($topics as $topic) {
|
|
$payload .= $this->createPayload($topic);
|
|
}
|
|
$header = $this->createHeader(self::MQTT_UNSUBSCRIBE + 0x02, $payload);
|
|
$this->debugMessage('Sending UNSUBSCRIBE');
|
|
$this->send($header . $payload);
|
|
|
|
// An UNSUBACK packet is expected
|
|
$response = $this->waitForPacket(self::MQTT_UNSUBACK, $packetId);
|
|
if($response === false) {
|
|
$this->debugMessage('Invalid packet received, expecting UNSUBACK');
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Sends PINGREQ packet to server
|
|
*
|
|
* @return boolean Returns true if PINGRESP was recieved
|
|
*/
|
|
public function sendPing() {
|
|
if(!$this->isConnected()) return false;
|
|
|
|
$this->timeSincePingReq = time();
|
|
$header = $this->createHeader(self::MQTT_PINGREQ);
|
|
$this->debugMessage('Sending PING');
|
|
$this->send($header);
|
|
$this->pingReqTime = time();
|
|
|
|
// A PINGRESP packet is expected
|
|
$response = $this->waitForPacket(self::MQTT_PINGRESP);
|
|
if($response === false) {
|
|
$this->debugMessage('Invalid packet received, expecting PINGRESP');
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Send disconnect and close socket
|
|
*/
|
|
public function sendDisconnect() {
|
|
if($this->isConnected()) {
|
|
$header = $this->createHeader(self::MQTT_DISCONNECT);
|
|
$this->debugMessage('Sending DISCONNECT');
|
|
$this->send($header);
|
|
$this->close();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Close socket
|
|
*/
|
|
public function close() {
|
|
if($this->isConnected()) {
|
|
$this->debugMessage('Closing socket');
|
|
stream_socket_shutdown($this->socket, STREAM_SHUT_RDWR);
|
|
$this->socket = null;
|
|
$this->serverAliveTime = null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Check if connected to stream
|
|
* @return boolean
|
|
*/
|
|
public function isConnected() {
|
|
return !empty($this->socket);
|
|
}
|
|
|
|
/**
|
|
* Check if connection is alive
|
|
* @return boolean
|
|
*/
|
|
public function isAlive() {
|
|
return $this->isConnected() && ($this->serverAliveTime + $this->connectKeepAlive <= time());
|
|
}
|
|
|
|
/**
|
|
* Set debug mode, if true then output progress messages
|
|
*
|
|
* @param boolean $mode
|
|
*/
|
|
public function setDebug($mode = true) {
|
|
$this->debug = $mode;
|
|
}
|
|
|
|
/**
|
|
* Print message to console if debug mode is on
|
|
*
|
|
* @param string $message
|
|
*/
|
|
private function debugMessage($message) {
|
|
if ($this->debug) {
|
|
echo 'MQTT: '. $message .PHP_EOL;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Return next packet identifier to use in MQTT packet
|
|
* Max 2 bytes to be used, restart on 0 if end reached
|
|
*
|
|
* @return integer
|
|
*/
|
|
private function getNextPacketId() {
|
|
return ($this->packetId = ($this->packetId + 1) & 0xffff);
|
|
}
|
|
|
|
/**
|
|
* Return payload of packet id, use latest generated packet id as default
|
|
*
|
|
* @param integer $packetId
|
|
* @return string Two chars with apyload to add in MQTT-message
|
|
*/
|
|
private function getPacketIdPayload($packetId = null) {
|
|
if (empty($packetId)) $packetId = $this->packetId;
|
|
return chr(($packetId & 0xff00)>>8) . chr($packetId & 0xff);
|
|
}
|
|
|
|
/**
|
|
* Add payload length as bytes to begining of string and return
|
|
*
|
|
* @param string $payload
|
|
* @return string
|
|
*/
|
|
private function createPayload($payload) {
|
|
$fullLength = strlen($payload);
|
|
$retval = chr($fullLength>>8).chr($fullLength&0xff).$payload;
|
|
return $retval;
|
|
}
|
|
|
|
/**
|
|
* Decode payload using inital length (2 bytes) and return as string array
|
|
*
|
|
* @param string $payload
|
|
* @return string[]
|
|
*/
|
|
private function decodePayload($payload) {
|
|
$result = [];
|
|
while (strlen($payload) >= 2) {
|
|
$length = ord($payload[0])<<8 + ord($payload[1]);
|
|
if (strlen($payload) <= $length + 2) {
|
|
$result[] = substr($payload, 2, $length);
|
|
}
|
|
$payload = substr($payload, min($length + 2, strlen($payload)));
|
|
}
|
|
return $result;
|
|
}
|
|
|
|
/**
|
|
* Send data to open socket
|
|
*
|
|
* @param string $data
|
|
* @return boolean Only returns true if all data was sent
|
|
*/
|
|
private function send($data) {
|
|
if ($this->socket) {
|
|
$result = fwrite($this->socket, $data);
|
|
if (($result !== false) && ($result == strlen($data))) {
|
|
$this->serverAliveTime = time();
|
|
return true;
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* Read bytes from socket until x bytes read, eof reached or socket timeout
|
|
*
|
|
* @param int $bytes Number of bytes to read
|
|
* @return string Return bytes read as a string
|
|
*/
|
|
private function readBytes($bytes) {
|
|
if (!$this->socket) return false;
|
|
//if ($bytes == 0) return '';
|
|
$bytesLeft = $bytes;
|
|
$result = '';
|
|
do {
|
|
// If stream at end, close down socket and exit
|
|
if(feof($this->socket)) {
|
|
$this->debugMessage('Reached EOF for stream');
|
|
$this->close();
|
|
return $result;
|
|
}
|
|
// Try to read from stream
|
|
$str = fread($this->socket, $bytesLeft);
|
|
if ($str !== false && strlen($str) > 0) {
|
|
$result .= $str;
|
|
$bytesLeft -= strlen($str);
|
|
}
|
|
if ($bytesLeft <= 0) {
|
|
// If all bytes read, then return them
|
|
$this->serverAliveTime = time();
|
|
return $result;
|
|
}
|
|
// Check if timeout
|
|
$info = stream_get_meta_data($this->socket);
|
|
if ($info['timed_out']) {
|
|
$this->debugMessage('Read timeout');
|
|
return false;
|
|
}
|
|
// Wait a while before trying to read again (in micro seconds)
|
|
usleep($this->socketReadDelay * 1000);
|
|
} while (true);
|
|
}
|
|
|
|
/**
|
|
* Encode length to bytes to send in stream
|
|
*
|
|
* @param integer $len
|
|
* @return string
|
|
*/
|
|
private function encodeLength($len) {
|
|
if ($len < 0 || $len >= 128*128*128*128) {
|
|
// illegal length
|
|
return false;
|
|
}
|
|
$output = '';
|
|
do {
|
|
$byte = $len & 0x7f; // keep lowest 7 bits
|
|
$len = $len >> 7; // shift away lowest 7 bits
|
|
if ($len > 0) {
|
|
$byte = $byte | 0x80; // set high bit to indicate continuation
|
|
}
|
|
$output .= chr($byte);
|
|
} while ($len > 0);
|
|
return $output;
|
|
}
|
|
|
|
/**
|
|
* Return length of packet by reading from stream
|
|
*
|
|
* @return integer
|
|
*/
|
|
private function readPacketLength() {
|
|
$bytesRead = 0;
|
|
$len = 0;
|
|
$multiplier = 1;
|
|
do {
|
|
if ($bytesRead > 4) {
|
|
return false; // Malformed length
|
|
}
|
|
$str = $this->readBytes(1);
|
|
if ($str === false || strlen($str) != 1) {
|
|
return false; // Unexpected end of stream
|
|
}
|
|
$byte = ord($str[0]);
|
|
$len += ($byte & 0x7f) * $multiplier;
|
|
$isContinued = ($byte & 0x80);
|
|
if ($isContinued) {
|
|
$multiplier *= 128;
|
|
}
|
|
$bytesRead++;
|
|
} while ($isContinued);
|
|
return $len;
|
|
}
|
|
|
|
/**
|
|
* Create MQTT header from command and payload
|
|
*
|
|
* @param int $command Command to send
|
|
* @param string $payload Payload to be sent
|
|
*
|
|
* @return string Header to send
|
|
*/
|
|
private function createHeader($command, $payload = '') {
|
|
return chr($command) . $this->encodeLength(strlen($payload));
|
|
}
|
|
|
|
/**
|
|
* Read next packet from stream
|
|
*
|
|
* @return boolean
|
|
*/
|
|
private function readNextPacket() {
|
|
do {
|
|
$header = $this->readBytes(1);
|
|
if ($header === false) {
|
|
$this->lastReadStatus = self::READ_STATUS_ERROR_HEADER;
|
|
return false;
|
|
}
|
|
} while ((ord($header)&0xf0) == 0); // 0 is illegal control code to start with
|
|
|
|
$packetLength = $this->readPacketLength();
|
|
if ($packetLength === false) {
|
|
$this->debugMessage('Could not decode packet length');
|
|
$this->lastReadStatus = self::READ_STATUS_ERROR_PACKETLENGTH;
|
|
return false;
|
|
}
|
|
|
|
$payload = $packetLength > 0 ? $this->readBytes($packetLength) : '';
|
|
if ($payload === false) {
|
|
$this->lastReadStatus = self::READ_STATUS_ERROR_PAYLOAD;
|
|
return false;
|
|
}
|
|
$this->debugMessage('Packet response: '. self::str2hex($header . $payload));
|
|
$this->lastReadStatus = self::READ_STATUS_OK;
|
|
return $header . $payload;
|
|
}
|
|
|
|
public function getLastReadStatus() {
|
|
return $this->lastReadStatus;
|
|
}
|
|
|
|
public function hasMoreToRead() {
|
|
return ($this->lastReadStatus == self::READ_STATUS_OK) && $this->isConnected();
|
|
}
|
|
|
|
/**
|
|
* Read packets from stream and save to queue. Quit after x packets or timeout.
|
|
*
|
|
* @param integer $maxPackets Packet id the message must match
|
|
* @return integer Number of packets read
|
|
*/
|
|
private function readPackets($maxPackets = 100) {
|
|
$receivedPackets = 0;
|
|
while (($receivedPackets < $maxPackets) && ($packet = $this->readNextPacket()) !== false) {
|
|
$this->packetQueue[] = $packet;
|
|
$receivedPackets++;
|
|
}
|
|
return $receivedPackets;
|
|
}
|
|
|
|
/**
|
|
* Wait until a certain packet is found in the stream.
|
|
* Save other recieved packets in queue.
|
|
*
|
|
* @param byte $header Header to look for (only 4 high bits) 0xf0
|
|
* @param integer $verifyPacketId Packet id the message must match
|
|
* @return boolean
|
|
*/
|
|
private function waitForPacket($header, $verifyPacketId = false) {
|
|
// first check unhandled packets
|
|
foreach ($this->packetQueue as $key => $packet) {
|
|
if ($this->isPacketVerified($packet, $header, $verifyPacketId)) {
|
|
// if found, remove from queue and return packet
|
|
unset($this->packetQueue[$key]);
|
|
return $packet;
|
|
}
|
|
}
|
|
// if not found in queue, start reading from stream until found or timeout
|
|
do {
|
|
$packet = $this->readNextPacket();
|
|
if ($packet === false || empty($packet)) return false;
|
|
if ($this->isPacketVerified($packet, $header, $verifyPacketId)) {
|
|
return $packet;
|
|
}
|
|
// another packet found, save it to queue
|
|
$this->packetQueue[] = $packet;
|
|
} while(true);
|
|
}
|
|
|
|
/**
|
|
* Check if packet is of a given type and packet id match latest sent packet id
|
|
*
|
|
* @param string $packet
|
|
* @param char $header
|
|
* @param integer $verifyPacketId
|
|
* @return boolean
|
|
*/
|
|
private function isPacketVerified($packet, $header, $verifyPacketId = false) {
|
|
if (is_string($packet) && strlen($packet) >= 1) {
|
|
if ((int)(ord($packet[0])&0xf0) == (int)($header&0xf0)) {
|
|
if ($verifyPacketId === false) return true;
|
|
if (strlen($packet) >= 3) {
|
|
$receivedPacketId = (int)(ord($packet[1])<<8) + ord($packet[2]);
|
|
if($verifyPacketId == $receivedPacketId) {
|
|
return true;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* Get packets matching a header from the queue and remove from queue
|
|
*
|
|
* @param char $header
|
|
* @return string[]
|
|
*/
|
|
public function getQueuePackets($header) {
|
|
$foundPackets = [];
|
|
foreach ($this->packetQueue as $key => $packet) {
|
|
if ($this->isPacketVerified($packet, $header)) {
|
|
$foundPackets[] = $packet;
|
|
unset($this->packetQueue[$key]);
|
|
}
|
|
}
|
|
return $foundPackets;
|
|
}
|
|
|
|
/**
|
|
* Get PUBLISH packets and return them as messages
|
|
*
|
|
* @param integer $maxMessages Max messages to read
|
|
* @param boolean $sendPubAck If true, then send PUBACK to MQTT-server (QoS 1)
|
|
* @param boolean $sendPubRec If true, then send PUBREC to MQTT-server, wait for PUBREL and send PUBCOMP (QoS 2)
|
|
* @return string[] All PUBLISH messages which were confirmed or no confirmation needed/wanted
|
|
*/
|
|
public function getPublishMessages($maxMessages = 100, $sendPubAck = false, $sendPubRec = false) {
|
|
$packetsRead = $this->readPackets($maxMessages);
|
|
$packets = $this->getQueuePackets(self::MQTT_PUBLISH);
|
|
$messages = [];
|
|
foreach ($packets as $key => $packet) {
|
|
$message = $this->decodePublish($packet);
|
|
if ($message === false) {
|
|
$this->debugMessage('Message could not be decoded');
|
|
continue;
|
|
}
|
|
|
|
if ($sendPubAck && ($message['qos'] == self::MQTT_QOS1)) {
|
|
if($this->sendPubAck($message['packetId']) === false) {
|
|
$this->debugMessage('Failed to send PUBACK');
|
|
continue;
|
|
}
|
|
} elseif ($sendPubRec && ($message['qos'] == self::MQTT_QOS2)) {
|
|
// Send PUBREC
|
|
if($this->sendPubRec($message['packetId']) === false) {
|
|
$this->debugMessage('Failed to send PUBREC');
|
|
continue;
|
|
}
|
|
// A PUBREL packet is expected
|
|
$response = $this->waitForPacket(self::MQTT_PUBREL, $message['packetId']);
|
|
if($response === false) {
|
|
$this->debugMessage('Packet missing, expecting PUBREL');
|
|
continue;
|
|
}
|
|
// Send PUBCOMP
|
|
if($this->sendPubComp($message['packetId']) === false) {
|
|
$this->debugMessage('Failed to send PUBCOMP');
|
|
continue;
|
|
}
|
|
}
|
|
|
|
// Package was successfully confirmed or no confirmation needed/wanted --> store it
|
|
$messages[] = $message;
|
|
}
|
|
return $messages;
|
|
}
|
|
|
|
/**
|
|
* Decode a publish packet to its attributes
|
|
*
|
|
* @param string $packet
|
|
* @return array|boolean Return message or false if decode failed
|
|
*/
|
|
public function decodePublish($packet) {
|
|
if (!is_string($packet) || (strlen($packet) <= 3)) {
|
|
return false;
|
|
}
|
|
$flags = ord($packet[0]) & 0x0f;
|
|
$duplicate = ($flags == 0x80);
|
|
$retain = ($flags == 0x01);
|
|
$qos = ($flags>>1) & 0x03;
|
|
$topicLength = (ord($packet[1])<<8) + ord($packet[2]);
|
|
$topic = substr($packet, 3, $topicLength);
|
|
|
|
$payload = substr($packet, 3 + $topicLength); // Get the payload of the packet
|
|
if ($qos == 0) {
|
|
// no packet id for QoS 0, the payload is the message
|
|
$message = $payload;
|
|
$packetId = NULL;
|
|
} else {
|
|
if (strlen($payload) >= 2) {
|
|
$packetId = (ord($payload[0])<<8) + ord($payload[1]);
|
|
$message = substr($payload, 2); // skip packet id (2 bytes) for QoS 1 and 2
|
|
} else {
|
|
// 2 byte packet id required, but not found. exit gracefully (no failure)
|
|
$packetId = NULL;
|
|
$message = '';
|
|
}
|
|
}
|
|
return [
|
|
'topic' => self::convertActiveMqTopic($topic),
|
|
'message' => $message,
|
|
'retain' => $retain,
|
|
'duplicate' => $duplicate,
|
|
'qos' => $qos,
|
|
'packetId' => $packetId,
|
|
];
|
|
}
|
|
|
|
/**
|
|
* Replace ActiveMQ special characters to MQTT-standard
|
|
*
|
|
* @param string $topic
|
|
* @return string
|
|
*/
|
|
private static function convertActiveMqTopic($topic) {
|
|
$topic = str_replace(".","/", $topic);
|
|
$topic = str_replace("*","+", $topic);
|
|
$topic = str_replace(">","#", $topic);
|
|
return $topic;
|
|
}
|
|
|
|
/**
|
|
* Return a string interpreted as hex and ASCII (between 0x20-0x7f)
|
|
* Good for displaying recieved packets
|
|
*
|
|
* @param string $str
|
|
* @return string
|
|
*/
|
|
private function str2hex($str) {
|
|
$hex = '';
|
|
$ascii = '';
|
|
for ($i=0; $i<strlen($str); $i++) {
|
|
$char = $str[$i];
|
|
if (ord($char) >= 0x20 && ord($char) <= 0x7f) {
|
|
$ascii .= $char;
|
|
} else {
|
|
$ascii .= '.';
|
|
}
|
|
$hex .= dechex(ord($char)).' ';
|
|
}
|
|
return $hex . '"'. $ascii .'"';
|
|
}
|
|
|
|
public function dumpQueue() {
|
|
foreach ($this->packetQueue as $packet) {
|
|
$this->str2hex($packet) . PHP_EOL;
|
|
}
|
|
}
|
|
}
|