You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1097 lines
37 KiB

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Libs;
  4. class MQTTClient {
  5. // MQTT control packet types (here left shifted 4 bits)
  6. const MQTT_CONNECT = 0x10; // Client request to connect to Server
  7. const MQTT_CONNACK = 0x20; // Connect acknowledgment
  8. const MQTT_PUBLISH = 0x30; // Publish message
  9. const MQTT_PUBACK = 0x40; // Publish acknowledgment
  10. const MQTT_PUBREC = 0x50; // Publish received (assured delivery part 1)
  11. const MQTT_PUBREL = 0x62; // Publish release (assured delivery part 2)
  12. const MQTT_PUBCOMP = 0x70; // Publish complete (assured delivery part 3)
  13. const MQTT_SUBSCRIBE = 0x80; // Client subscribe request
  14. const MQTT_SUBACK = 0x90; // Subscribe acknowledgment
  15. const MQTT_UNSUBSCRIBE = 0xa0; // Unsubscribe request
  16. const MQTT_UNSUBACK = 0xb0; // Unsubscribe acknowledgment
  17. const MQTT_PINGREQ = 0xc0; // PING request
  18. const MQTT_PINGRESP = 0xd0; // PING response
  19. const MQTT_DISCONNECT = 0xe0; // Client is disconnecting
  20. // MQTT quality of service levels
  21. const MQTT_QOS0 = 0x00;
  22. const MQTT_QOS1 = 0x01;
  23. const MQTT_QOS2 = 0x02;
  24. // MQTT status on last read from stream
  25. const READ_STATUS_NO_READ = 0;
  26. const READ_STATUS_OK = 200;
  27. const READ_STATUS_ERROR = 400;
  28. const READ_STATUS_ERROR_HEADER = 401;
  29. const READ_STATUS_ERROR_PACKETLENGTH = 402;
  30. const READ_STATUS_ERROR_PAYLOAD = 403;
  31. private $socket = null; // Socket resource reference
  32. private $socketTimeout; // Default socket timeout in milliseconds
  33. private $socketReadDelay = 1000; // Delay in milliseconds between read attempts on socket
  34. private $protocol = 'tcp';
  35. private $serverAddress;
  36. private $serverPort;
  37. private $clientId;
  38. public $caFile = null;
  39. public $localCert = null;
  40. public $localPrivateKey = null;
  41. private $connectCleanSession;
  42. private $connectKeepAlive;
  43. private $connectWill = false;
  44. private $connectWillQos;
  45. private $connectWillRetain;
  46. private $connectUsername;
  47. private $connectPassword;
  48. private $willTopic;
  49. private $willMessage;
  50. private $packetId; // Packet identifier to validate return packets
  51. private $pingReqTime; // Time when last PINGREQ was sent
  52. private $serverAliveTime; // Time for last response from server
  53. private $debug = false;
  54. private $lastReadStatus = self::READ_STATUS_NO_READ;
  55. private $packetQueue = []; // Queue for received but unhandled packages
  56. public $lastConnectResult = 0;
  57. /**
  58. * Class constructor - Sets up connection parameters
  59. *
  60. * @param string $address Address to server
  61. * @param string $port Port to server
  62. * @param string $protocol Which protocol to use
  63. */
  64. function __construct($address, $port=null, $protocol='tcp'){
  65. if ($this->setConnection($address, $port, $protocol)) {
  66. ;
  67. }
  68. $this->packetId = rand(1,100)*100; // Reduce risk of creating duplicate ids in sequential sessions
  69. }
  70. /**
  71. * Class destructor - Close socket
  72. */
  73. function __destruct(){
  74. $this->close();
  75. }
  76. /**
  77. * Setup conection parameters
  78. *
  79. * @param string $address
  80. * @param string $port
  81. * @param string $protocol
  82. * @return boolean If return false then using default parameters where validation failed
  83. */
  84. function setConnection($address, $port=null, $protocol='tcp'){
  85. $this->serverAddress = $address;
  86. $this->serverPort = $port;
  87. // Validate protocol
  88. $protocol = strtolower($protocol);
  89. if (($protocol != 'tcp') && !self::isEncryptionProtocol($protocol)) {
  90. $this->debugMessage('Invalid protocol ('.$protocol.'). Setting to default (tcp).');
  91. $this->protocol = 'tcp';
  92. return false;
  93. }
  94. $this->protocol = $protocol;
  95. return true;
  96. }
  97. /**
  98. * Build url for connecting to stream
  99. *
  100. * @return string
  101. */
  102. private function getUrl() {
  103. $url = '';
  104. if ($this->protocol) $url .= $this->protocol .'://';
  105. $url .= $this->serverAddress;
  106. if ($this->serverPort) $url .= ':'. $this->serverPort;
  107. return $url;
  108. }
  109. /**
  110. * Check if encryption protocol is supported
  111. *
  112. * @param string $protcol
  113. * @return boolean
  114. */
  115. private static function isEncryptionProtocol($protocol) {
  116. return in_array(strtolower($protocol), ['ssl', 'tls', 'tlsv1.0', 'tlsv1.1', 'tlsv1.2', 'sslv3']);
  117. }
  118. /**
  119. * Sets server certificate and protocol for ssl/tls encryption
  120. *
  121. * @param string $caFile CA file to identify server
  122. * @param string $protocl Crypto protocol (See http://php.net/manual/en/migration56.openssl.php)
  123. * @return boolean False if settings failed, else true
  124. */
  125. public function setEncryption($caFile, $protocol = null) {
  126. if (file_exists($caFile)) {
  127. $this->caFile = $caFile;
  128. } else {
  129. $this->debugMessage('CA file not found');
  130. return false;
  131. }
  132. if(self::isEncryptionProtocol($protocol)) {
  133. $this->protocol = $protocol;
  134. } else if (!is_null($protocol)) {
  135. $this->debugMessage('Unknown encryption protocol');
  136. return false;
  137. }
  138. return true;
  139. }
  140. /**
  141. * Sets client crt and key files for client-side authentication
  142. *
  143. * @param string $crtFile Client certificate file
  144. * @param string $keyFile Client key file
  145. * @return boolean False if settings failed, else true
  146. */
  147. public function setClientEncryption($certificateFile, $keyFile) {
  148. if (!file_exists($certificateFile)) {
  149. $this->debugMessage('Client certificate file not found');
  150. return false;
  151. }
  152. if (!file_exists($keyFile)) {
  153. $this->debugMessage('Client key file not found');
  154. return false;
  155. }
  156. $this->localCert= $certificateFile;
  157. $this->localPrivateKey = $keyFile;
  158. return true;
  159. }
  160. /**
  161. * Set authentication details to be used when connecting
  162. *
  163. * @param string $username Username
  164. * @param string $password Password
  165. */
  166. public function setAuthentication($username, $password) {
  167. $this->connectUsername= $username;
  168. $this->connectPassword = $password;
  169. }
  170. /**
  171. * Set will (last message defined by MQTT) to send when connection is lost
  172. *
  173. * @param string $topic
  174. * @param string $message
  175. * @param integer $qos
  176. * @param boolean $retain
  177. */
  178. public function setWill($topic, $message, $qos=1, $retain=false) {
  179. $this->connectWill = true;
  180. $this->connectWillQos = $qos;
  181. $this->connectWillRetain = $retain;
  182. $this->willTopic = $topic;
  183. $this->willMessage = $message;
  184. }
  185. /**
  186. * Connect to MQTT server
  187. *
  188. * @param string $clientId Unique id used by the server to identify the client
  189. * @param boolean $cleanSession Set true to clear session on server, ie queued messages are purged (not recieved)
  190. * @param integer $keepAlive Number of seconds a connection is considered to be alive without traffic
  191. * @param integer $timeout Number of millliseconds before timeout when reading from socket
  192. * @return boolean Returns false if connection failed
  193. */
  194. public function sendConnect($clientId, $cleanSession=false, $keepAlive=10, $timeout=5000) {
  195. if (!$this->serverAddress) return false;
  196. // Basic validation of clientid
  197. // Note: A MQTT server may accept other chars and more than 23 chars in the clientid but that is optional,
  198. // all chars below up to 23 chars are required to be accepted (see section "3.1.3.1 Client Identifier" of the standard)
  199. if(preg_match("/[^0-9a-zA-Z]/",$clientId)) {
  200. $this->debugMessage('ClientId can only contain characters 0-9,a-z,A-Z');
  201. return false;
  202. }
  203. if(strlen($clientId) > 23) {
  204. $this->debugMessage('ClientId max length is 23 characters/numbers');
  205. return false;
  206. }
  207. $this->clientId = $clientId;
  208. $this->connectCleanSession = $cleanSession;
  209. $this->connectKeepAlive = $keepAlive;
  210. $this->socketTimeout = $timeout;
  211. // Setup certificates if encryption protocol selected
  212. if ($this->isEncryptionProtocol($this->protocol)) {
  213. $mozillaCiphers = implode(':', array(
  214. 'ECDHE-RSA-AES128-GCM-SHA256',
  215. 'ECDHE-ECDSA-AES128-GCM-SHA256',
  216. 'ECDHE-RSA-AES256-GCM-SHA384',
  217. 'ECDHE-ECDSA-AES256-GCM-SHA384',
  218. 'DHE-RSA-AES128-GCM-SHA256',
  219. 'DHE-DSS-AES128-GCM-SHA256',
  220. 'kEDH+AESGCM',
  221. 'ECDHE-RSA-AES128-SHA256',
  222. 'ECDHE-ECDSA-AES128-SHA256',
  223. 'ECDHE-RSA-AES128-SHA',
  224. 'ECDHE-ECDSA-AES128-SHA',
  225. 'ECDHE-RSA-AES256-SHA384',
  226. 'ECDHE-ECDSA-AES256-SHA384',
  227. 'ECDHE-RSA-AES256-SHA',
  228. 'ECDHE-ECDSA-AES256-SHA',
  229. 'DHE-RSA-AES128-SHA256',
  230. 'DHE-RSA-AES128-SHA',
  231. 'DHE-DSS-AES128-SHA256',
  232. 'DHE-RSA-AES256-SHA256',
  233. 'DHE-DSS-AES256-SHA',
  234. 'DHE-RSA-AES256-SHA',
  235. 'AES128-GCM-SHA256',
  236. 'AES256-GCM-SHA384',
  237. 'ECDHE-RSA-RC4-SHA',
  238. 'ECDHE-ECDSA-RC4-SHA',
  239. 'AES128',
  240. 'AES256',
  241. 'RC4-SHA',
  242. 'HIGH',
  243. '!aNULL',
  244. '!eNULL',
  245. '!EXPORT',
  246. '!DES',
  247. '!3DES',
  248. '!MD5',
  249. '!PSK'
  250. ));
  251. // Secure socket communication with these parameters, a ca-file is required
  252. $options = [];
  253. $options['verify_peer'] = true;
  254. $options['verify_peer_name'] = true;
  255. $options['verify_depth'] = 5;
  256. $options['disable_compression'] = true;
  257. $options['SNI_enabled'] = true;
  258. $options['ciphers'] = $mozillaCiphers;
  259. if($this->caFile) {
  260. $options['cafile'] = $this->caFile;
  261. }
  262. if($this->localCert) {
  263. $options['local_cert'] = $this->localCert;
  264. if ($this->localPrivateKey) {
  265. $options['local_pk'] = $this->localPrivateKey;
  266. }
  267. }
  268. $socketContext = stream_context_create(['ssl' => $options]);
  269. $this->debugMessage('Settings socket options: '. var_export($options, true));
  270. } else {
  271. $socketContext = null;
  272. }
  273. // Try to open socket
  274. try {
  275. $this->debugMessage('Opening socket to: '. $this->getUrl());
  276. if ($socketContext) {
  277. $this->socket = stream_socket_client($this->getUrl(), $errno, $errstr, 10, STREAM_CLIENT_CONNECT, $socketContext);
  278. } else {
  279. $this->socket = stream_socket_client($this->getUrl(), $errno, $errstr, 10, STREAM_CLIENT_CONNECT);
  280. }
  281. } catch (\ErrorException $error) {
  282. $this->debugMessage('Exception: Could not open stream with error message: '. $error->getMessage());
  283. $this->socket = null;
  284. return false;
  285. }
  286. // Check if socket was opened successfully
  287. if ($this->socket === false) {
  288. $this->socket = null;
  289. $this->debugMessage('Connection failed. Error-no:'. $errno .' Error message: '. $errstr);
  290. return false;
  291. }
  292. // Set socket timeout
  293. ini_set('default_socket_timeout', '10');
  294. stream_set_timeout($this->socket, 0, $this->socketTimeout * 1000);
  295. // Set stream to non-blocking mode, ie do not wait to read if stream is empty
  296. stream_set_blocking($this->socket, true);
  297. // Calculate connect flags to use in CONNECT header
  298. $connectFlags = 0;
  299. if ($this->connectCleanSession) $connectFlags += 0x02;
  300. if ($this->connectWill) {
  301. $connectFlags += 0x04;
  302. if ($this->connectWillQos) $connectFlags += ($this->connectWill << 3);
  303. if ($this->connectWillRetain) $connectFlags += 0x20;
  304. }
  305. if ($this->connectUsername) {
  306. $connectFlags += 0x80;
  307. if ($this->connectPassword) $connectFlags += 0x40;
  308. }
  309. // Build payload and header for CONNECT-packet
  310. $payload = chr(0x00).chr(0x04); // MSB & LSB length of MQTT = 4
  311. $payload .= 'MQTT';
  312. $payload .= chr(0x04); // Protocol level (3.1.1)
  313. $payload .= chr($connectFlags); // Connect flags
  314. $payload .= chr($this->connectKeepAlive >> 8); // Keepalive (MSB)
  315. $payload .= chr($this->connectKeepAlive & 0xff); // Keepalive (LSB)
  316. if ($this->connectCleanSession && empty($this->clientId)) {
  317. $this->clientId = rand(1,999999999);
  318. }
  319. if ($this->clientId) {
  320. $payload .= $this->createPayload($this->clientId);
  321. }
  322. if($this->connectWill){
  323. $payload .= $this->createPayload($this->willTopic);
  324. $payload .= $this->createPayload($this->willMessage);
  325. }
  326. if($this->connectUsername) {
  327. $payload .= $this->createPayload($this->connectUsername);
  328. }
  329. if ($this->connectPassword) {
  330. $payload .= $this->createPayload($this->connectPassword);
  331. }
  332. $header = $this->createHeader(self::MQTT_CONNECT, $payload);
  333. $this->debugMessage('Sending CONNECT');
  334. $this->send($header . $payload);
  335. // Wait for CONNACK packet
  336. $response = $this->waitForPacket(self::MQTT_CONNACK);
  337. if($response !== false && ($response[2] == chr(0))) {
  338. $this->debugMessage('Connected to MQTT');
  339. $this->lastConnectResult = 0;
  340. return true;
  341. } else {
  342. $this->debugMessage('Connection failed! Error: '. ord($response[2]));
  343. $this->lastConnectResult = ord($response[2]);
  344. $this->close();
  345. return false;
  346. }
  347. }
  348. /**
  349. * Publish a topic and message (QoS 0,1,2 supported)
  350. *
  351. * @param string $topic
  352. * @param string $message
  353. * @param byte $qos
  354. * @return boolean
  355. */
  356. public function sendPublish($topic, $message, $qos = self::MQTT_QOS1, $retain = 0) {
  357. if(!$this->isConnected()) return false;
  358. if($qos!=self::MQTT_QOS0 && $qos!=self::MQTT_QOS1 && $qos!=self::MQTT_QOS2) return false;
  359. $packetId = $this->getNextPacketId();
  360. $payload = $this->createPayload($topic);
  361. if($qos >= self::MQTT_QOS1) {
  362. // Packet identifier required for QoS level >= 1
  363. $payload .= $this->getPacketIdPayload();
  364. }
  365. $payload .= $message;
  366. $dupFlag = 0;
  367. $header = $this->createHeader(self::MQTT_PUBLISH + ($dupFlag<<3) + ($qos<<1) + $retain, $payload);
  368. $this->debugMessage('Sending PUBLISH');
  369. $this->send($header . $payload);
  370. if($qos == self::MQTT_QOS1) {
  371. // If QoS level 1, only a PUBACK packet is expected
  372. $response = $this->waitForPacket(self::MQTT_PUBACK, $packetId);
  373. if($response === false) {
  374. $this->debugMessage('Packet missing, expecting PUBACK');
  375. return false;
  376. }
  377. } elseif($qos == self::MQTT_QOS2) {
  378. // If QoS level 2, a PUBREC packet is expected
  379. $response = $this->waitForPacket(self::MQTT_PUBREC, $packetId);
  380. if($response === false) {
  381. $this->debugMessage('Packet missing, expecting PUBREC');
  382. return false;
  383. }
  384. // Send PUBREL
  385. $response = $this->sendPubRel($packetId);
  386. if($response === false) {
  387. $this->debugMessage('Failed to send PUBREL');
  388. return false;
  389. }
  390. // A PUBCOMP packet is expected
  391. $response = $this->waitForPacket(self::MQTT_PUBCOMP, $packetId);
  392. if($response === false) {
  393. $this->debugMessage('Packet missing, expecting PUBCOMP');
  394. return false;
  395. }
  396. }
  397. return true;
  398. }
  399. /**
  400. * Send PUBACK as response to a recieved PUBLISH packet (QoS Level 1)
  401. *
  402. * @param integer $packetId Packet identifier of PUBLISH packet
  403. * @return boolean Returns true if packet sent successfully
  404. */
  405. public function sendPubAck($packetId) {
  406. if(!$this->isConnected()) return false;
  407. $payload = chr(($packetId & 0xff00)>>8) . chr($packetId & 0xff);
  408. $header = $this->createHeader(self::MQTT_PUBACK, $payload);
  409. $this->debugMessage('Sending PUBACK');
  410. $this->send($header . $payload);
  411. return true;
  412. }
  413. /**
  414. * Send PUBREC as response to a recieved PUBLISH packet (QoS Level 2)
  415. *
  416. * @param integer $packetId Packet identifier of PUBLISH packet
  417. * @return boolean Returns true if packet sent successfully
  418. */
  419. public function sendPubRec($packetId) {
  420. if(!$this->isConnected()) return false;
  421. $payload = chr(($packetId & 0xff00)>>8) . chr($packetId & 0xff);
  422. $header = $this->createHeader(self::MQTT_PUBREC, $payload);
  423. $this->debugMessage('Sending PUBREC');
  424. $this->send($header . $payload);
  425. return true;
  426. }
  427. /**
  428. * Send PUBREL as response to a recieved PUBREC packet (QoS Level 2)
  429. *
  430. * @param integer $packetId Packet identifier of PUBLISH packet
  431. * @return boolean Returns true if packet sent successfully
  432. */
  433. public function sendPubRel($packetId) {
  434. if(!$this->isConnected()) return false;
  435. $payload = chr(($packetId & 0xff00)>>8) . chr($packetId & 0xff);
  436. $header = $this->createHeader(self::MQTT_PUBREL, $payload);
  437. $this->debugMessage('Sending PUBREL');
  438. $this->send($header . $payload);
  439. return true;
  440. }
  441. /**
  442. * Send PUBCOMP as response to a recieved PUBREL packet (QoS Level 2)
  443. *
  444. * @param integer $packetId Packet identifier of PUBLISH packet
  445. * @return boolean Returns true if packet sent successfully
  446. */
  447. public function sendPubComp($packetId) {
  448. if(!$this->isConnected()) return false;
  449. $payload = chr(($packetId & 0xff00)>>8) . chr($packetId & 0xff);
  450. $header = $this->createHeader(self::MQTT_PUBCOMP, $payload);
  451. $this->debugMessage('Sending PUBCOMP');
  452. $this->send($header . $payload);
  453. return true;
  454. }
  455. /**
  456. * Subscribe to topics with a quality of service
  457. *
  458. * @param string[] $topics Topics to subscribe for
  459. * @param integer $qos Quality of serivce for all topics
  460. * @return boolean Returns true if SUBACK was recieved
  461. */
  462. public function sendSubscribe($topics, $qos = self::MQTT_QOS1) {
  463. if (!is_array($topics)) $topics = [$topics];
  464. if(!$this->isConnected()) return false;
  465. $packetId = $this->getNextPacketId();
  466. $payload = $this->getPacketIdPayload();
  467. foreach($topics as $topic) {
  468. $payload .= $this->createPayload($topic);
  469. $payload .= chr($qos);
  470. }
  471. $header = $this->createHeader(self::MQTT_SUBSCRIBE + 0x02, $payload);
  472. $this->debugMessage('Sending SUBSCRIBE');
  473. $this->send($header . $payload);
  474. // A SUBACK packet is expected
  475. $response = $this->waitForPacket(self::MQTT_SUBACK, $packetId);
  476. if($response === false) {
  477. $this->debugMessage('Packet missing, expecting SUBACK');
  478. return false;
  479. }
  480. $responsePayload = substr($response, 3); // skip header and identifier (3 bytes)
  481. if (strlen($responsePayload) != count($topics)) {
  482. $this->debugMessage('Did not recieve SUBACK for all topics');
  483. return false;
  484. }
  485. // Check which subscriptions that were approved
  486. $topicsResult = [];
  487. $i = 0;
  488. foreach ($topics as $topic) {
  489. $topicsResult[$topic] = [];
  490. if ($responsePayload[$i] > 0x02) {
  491. $topicsResult[$topic]['success'] = false;
  492. $topicsResult[$topic]['qosGiven'] = null;
  493. } else {
  494. $topicsResult[$topic]['success'] = true;
  495. $topicsResult[$topic]['qosGiven'] = (int) ord($responsePayload[$i]);
  496. }
  497. $i++;
  498. }
  499. return $topicsResult;
  500. }
  501. /**
  502. * Send unsubscribe packet for given topics
  503. *
  504. * @param string[] $topics
  505. * @return boolean Returns true if UNSUBACK was recieved
  506. */
  507. public function sendUnsubscribe($topics) {
  508. if(!$this->isConnected()) return false;
  509. $packetId = $this->getNextPacketId();
  510. $payload = $this->getPacketIdPayload();
  511. foreach($topics as $topic) {
  512. $payload .= $this->createPayload($topic);
  513. }
  514. $header = $this->createHeader(self::MQTT_UNSUBSCRIBE + 0x02, $payload);
  515. $this->debugMessage('Sending UNSUBSCRIBE');
  516. $this->send($header . $payload);
  517. // An UNSUBACK packet is expected
  518. $response = $this->waitForPacket(self::MQTT_UNSUBACK, $packetId);
  519. if($response === false) {
  520. $this->debugMessage('Invalid packet received, expecting UNSUBACK');
  521. return false;
  522. }
  523. return true;
  524. }
  525. /**
  526. * Sends PINGREQ packet to server
  527. *
  528. * @return boolean Returns true if PINGRESP was recieved
  529. */
  530. public function sendPing() {
  531. if(!$this->isConnected()) return false;
  532. $this->timeSincePingReq = time();
  533. $header = $this->createHeader(self::MQTT_PINGREQ);
  534. $this->debugMessage('Sending PING');
  535. $this->send($header);
  536. $this->pingReqTime = time();
  537. // A PINGRESP packet is expected
  538. $response = $this->waitForPacket(self::MQTT_PINGRESP);
  539. if($response === false) {
  540. $this->debugMessage('Invalid packet received, expecting PINGRESP');
  541. return false;
  542. }
  543. return true;
  544. }
  545. /**
  546. * Send disconnect and close socket
  547. */
  548. public function sendDisconnect() {
  549. if($this->isConnected()) {
  550. $header = $this->createHeader(self::MQTT_DISCONNECT);
  551. $this->debugMessage('Sending DISCONNECT');
  552. $this->send($header);
  553. $this->close();
  554. }
  555. }
  556. /**
  557. * Close socket
  558. */
  559. public function close() {
  560. if($this->isConnected()) {
  561. $this->debugMessage('Closing socket');
  562. stream_socket_shutdown($this->socket, STREAM_SHUT_RDWR);
  563. $this->socket = null;
  564. $this->serverAliveTime = null;
  565. }
  566. }
  567. /**
  568. * Check if connected to stream
  569. * @return boolean
  570. */
  571. public function isConnected() {
  572. return !empty($this->socket);
  573. }
  574. /**
  575. * Check if connection is alive
  576. * @return boolean
  577. */
  578. public function isAlive() {
  579. return $this->isConnected() && ($this->serverAliveTime + $this->connectKeepAlive <= time());
  580. }
  581. /**
  582. * Set debug mode, if true then output progress messages
  583. *
  584. * @param boolean $mode
  585. */
  586. public function setDebug($mode = true) {
  587. $this->debug = $mode;
  588. }
  589. /**
  590. * Print message to console if debug mode is on
  591. *
  592. * @param string $message
  593. */
  594. private function debugMessage($message) {
  595. if ($this->debug) {
  596. echo 'MQTT: '. $message .PHP_EOL;
  597. }
  598. }
  599. /**
  600. * Return next packet identifier to use in MQTT packet
  601. * Max 2 bytes to be used, restart on 0 if end reached
  602. *
  603. * @return integer
  604. */
  605. private function getNextPacketId() {
  606. return ($this->packetId = ($this->packetId + 1) & 0xffff);
  607. }
  608. /**
  609. * Return payload of packet id, use latest generated packet id as default
  610. *
  611. * @param integer $packetId
  612. * @return string Two chars with apyload to add in MQTT-message
  613. */
  614. private function getPacketIdPayload($packetId = null) {
  615. if (empty($packetId)) $packetId = $this->packetId;
  616. return chr(($packetId & 0xff00)>>8) . chr($packetId & 0xff);
  617. }
  618. /**
  619. * Add payload length as bytes to begining of string and return
  620. *
  621. * @param string $payload
  622. * @return string
  623. */
  624. private function createPayload($payload) {
  625. $fullLength = strlen($payload);
  626. $retval = chr($fullLength>>8).chr($fullLength&0xff).$payload;
  627. return $retval;
  628. }
  629. /**
  630. * Decode payload using inital length (2 bytes) and return as string array
  631. *
  632. * @param string $payload
  633. * @return string[]
  634. */
  635. private function decodePayload($payload) {
  636. $result = [];
  637. while (strlen($payload) >= 2) {
  638. $length = ord($payload[0])<<8 + ord($payload[1]);
  639. if (strlen($payload) <= $length + 2) {
  640. $result[] = substr($payload, 2, $length);
  641. }
  642. $payload = substr($payload, min($length + 2, strlen($payload)));
  643. }
  644. return $result;
  645. }
  646. /**
  647. * Send data to open socket
  648. *
  649. * @param string $data
  650. * @return boolean Only returns true if all data was sent
  651. */
  652. private function send($data) {
  653. if ($this->socket) {
  654. $result = fwrite($this->socket, $data);
  655. if (($result !== false) && ($result == strlen($data))) {
  656. $this->serverAliveTime = time();
  657. return true;
  658. }
  659. }
  660. return false;
  661. }
  662. /**
  663. * Read bytes from socket until x bytes read, eof reached or socket timeout
  664. *
  665. * @param int $bytes Number of bytes to read
  666. * @return string Return bytes read as a string
  667. */
  668. private function readBytes($bytes) {
  669. if (!$this->socket) return false;
  670. //if ($bytes == 0) return '';
  671. $bytesLeft = $bytes;
  672. $result = '';
  673. do {
  674. // If stream at end, close down socket and exit
  675. if(feof($this->socket)) {
  676. $this->debugMessage('Reached EOF for stream');
  677. $this->close();
  678. return $result;
  679. }
  680. // Try to read from stream
  681. $str = fread($this->socket, $bytesLeft);
  682. if ($str !== false && strlen($str) > 0) {
  683. $result .= $str;
  684. $bytesLeft -= strlen($str);
  685. }
  686. if ($bytesLeft <= 0) {
  687. // If all bytes read, then return them
  688. $this->serverAliveTime = time();
  689. return $result;
  690. }
  691. // Check if timeout
  692. $info = stream_get_meta_data($this->socket);
  693. if ($info['timed_out']) {
  694. $this->debugMessage('Read timeout');
  695. return false;
  696. }
  697. // Wait a while before trying to read again (in micro seconds)
  698. usleep($this->socketReadDelay * 1000);
  699. } while (true);
  700. }
  701. /**
  702. * Encode length to bytes to send in stream
  703. *
  704. * @param integer $len
  705. * @return string
  706. */
  707. private function encodeLength($len) {
  708. if ($len < 0 || $len >= 128*128*128*128) {
  709. // illegal length
  710. return false;
  711. }
  712. $output = '';
  713. do {
  714. $byte = $len & 0x7f; // keep lowest 7 bits
  715. $len = $len >> 7; // shift away lowest 7 bits
  716. if ($len > 0) {
  717. $byte = $byte | 0x80; // set high bit to indicate continuation
  718. }
  719. $output .= chr($byte);
  720. } while ($len > 0);
  721. return $output;
  722. }
  723. /**
  724. * Return length of packet by reading from stream
  725. *
  726. * @return integer
  727. */
  728. private function readPacketLength() {
  729. $bytesRead = 0;
  730. $len = 0;
  731. $multiplier = 1;
  732. do {
  733. if ($bytesRead > 4) {
  734. return false; // Malformed length
  735. }
  736. $str = $this->readBytes(1);
  737. if ($str === false || strlen($str) != 1) {
  738. return false; // Unexpected end of stream
  739. }
  740. $byte = ord($str[0]);
  741. $len += ($byte & 0x7f) * $multiplier;
  742. $isContinued = ($byte & 0x80);
  743. if ($isContinued) {
  744. $multiplier *= 128;
  745. }
  746. $bytesRead++;
  747. } while ($isContinued);
  748. return $len;
  749. }
  750. /**
  751. * Create MQTT header from command and payload
  752. *
  753. * @param int $command Command to send
  754. * @param string $payload Payload to be sent
  755. *
  756. * @return string Header to send
  757. */
  758. private function createHeader($command, $payload = '') {
  759. return chr($command) . $this->encodeLength(strlen($payload));
  760. }
  761. /**
  762. * Read next packet from stream
  763. *
  764. * @return boolean
  765. */
  766. private function readNextPacket() {
  767. do {
  768. $header = $this->readBytes(1);
  769. if ($header === false) {
  770. $this->lastReadStatus = self::READ_STATUS_ERROR_HEADER;
  771. return false;
  772. }
  773. } while ((ord($header)&0xf0) == 0); // 0 is illegal control code to start with
  774. $packetLength = $this->readPacketLength();
  775. if ($packetLength === false) {
  776. $this->debugMessage('Could not decode packet length');
  777. $this->lastReadStatus = self::READ_STATUS_ERROR_PACKETLENGTH;
  778. return false;
  779. }
  780. $payload = $packetLength > 0 ? $this->readBytes($packetLength) : '';
  781. if ($payload === false) {
  782. $this->lastReadStatus = self::READ_STATUS_ERROR_PAYLOAD;
  783. return false;
  784. }
  785. $this->debugMessage('Packet response: '. self::str2hex($header . $payload));
  786. $this->lastReadStatus = self::READ_STATUS_OK;
  787. return $header . $payload;
  788. }
  789. public function getLastReadStatus() {
  790. return $this->lastReadStatus;
  791. }
  792. public function hasMoreToRead() {
  793. return ($this->lastReadStatus == self::READ_STATUS_OK) && $this->isConnected();
  794. }
  795. /**
  796. * Read packets from stream and save to queue. Quit after x packets or timeout.
  797. *
  798. * @param integer $maxPackets Packet id the message must match
  799. * @return integer Number of packets read
  800. */
  801. private function readPackets($maxPackets = 100) {
  802. $receivedPackets = 0;
  803. while (($receivedPackets < $maxPackets) && ($packet = $this->readNextPacket()) !== false) {
  804. $this->packetQueue[] = $packet;
  805. $receivedPackets++;
  806. }
  807. return $receivedPackets;
  808. }
  809. /**
  810. * Wait until a certain packet is found in the stream.
  811. * Save other recieved packets in queue.
  812. *
  813. * @param byte $header Header to look for (only 4 high bits) 0xf0
  814. * @param integer $verifyPacketId Packet id the message must match
  815. * @return boolean
  816. */
  817. private function waitForPacket($header, $verifyPacketId = false) {
  818. // first check unhandled packets
  819. foreach ($this->packetQueue as $key => $packet) {
  820. if ($this->isPacketVerified($packet, $header, $verifyPacketId)) {
  821. // if found, remove from queue and return packet
  822. unset($this->packetQueue[$key]);
  823. return $packet;
  824. }
  825. }
  826. // if not found in queue, start reading from stream until found or timeout
  827. do {
  828. $packet = $this->readNextPacket();
  829. if ($packet === false || empty($packet)) return false;
  830. if ($this->isPacketVerified($packet, $header, $verifyPacketId)) {
  831. return $packet;
  832. }
  833. // another packet found, save it to queue
  834. $this->packetQueue[] = $packet;
  835. } while(true);
  836. }
  837. /**
  838. * Check if packet is of a given type and packet id match latest sent packet id
  839. *
  840. * @param string $packet
  841. * @param char $header
  842. * @param integer $verifyPacketId
  843. * @return boolean
  844. */
  845. private function isPacketVerified($packet, $header, $verifyPacketId = false) {
  846. if (is_string($packet) && strlen($packet) >= 1) {
  847. if ((int)(ord($packet[0])&0xf0) == (int)($header&0xf0)) {
  848. if ($verifyPacketId === false) return true;
  849. if (strlen($packet) >= 3) {
  850. $receivedPacketId = (int)(ord($packet[1])<<8) + ord($packet[2]);
  851. if($verifyPacketId == $receivedPacketId) {
  852. return true;
  853. }
  854. }
  855. }
  856. }
  857. return false;
  858. }
  859. /**
  860. * Get packets matching a header from the queue and remove from queue
  861. *
  862. * @param char $header
  863. * @return string[]
  864. */
  865. public function getQueuePackets($header) {
  866. $foundPackets = [];
  867. foreach ($this->packetQueue as $key => $packet) {
  868. if ($this->isPacketVerified($packet, $header)) {
  869. $foundPackets[] = $packet;
  870. unset($this->packetQueue[$key]);
  871. }
  872. }
  873. return $foundPackets;
  874. }
  875. /**
  876. * Get PUBLISH packets and return them as messages
  877. *
  878. * @param integer $maxMessages Max messages to read
  879. * @param boolean $sendPubAck If true, then send PUBACK to MQTT-server (QoS 1)
  880. * @param boolean $sendPubRec If true, then send PUBREC to MQTT-server, wait for PUBREL and send PUBCOMP (QoS 2)
  881. * @return string[] All PUBLISH messages which were confirmed or no confirmation needed/wanted
  882. */
  883. public function getPublishMessages($maxMessages = 100, $sendPubAck = false, $sendPubRec = false) {
  884. $packetsRead = $this->readPackets($maxMessages);
  885. $packets = $this->getQueuePackets(self::MQTT_PUBLISH);
  886. $messages = [];
  887. foreach ($packets as $key => $packet) {
  888. $message = $this->decodePublish($packet);
  889. if ($message === false) {
  890. $this->debugMessage('Message could not be decoded');
  891. continue;
  892. }
  893. if ($sendPubAck && ($message['qos'] == self::MQTT_QOS1)) {
  894. if($this->sendPubAck($message['packetId']) === false) {
  895. $this->debugMessage('Failed to send PUBACK');
  896. continue;
  897. }
  898. } elseif ($sendPubRec && ($message['qos'] == self::MQTT_QOS2)) {
  899. // Send PUBREC
  900. if($this->sendPubRec($message['packetId']) === false) {
  901. $this->debugMessage('Failed to send PUBREC');
  902. continue;
  903. }
  904. // A PUBREL packet is expected
  905. $response = $this->waitForPacket(self::MQTT_PUBREL, $message['packetId']);
  906. if($response === false) {
  907. $this->debugMessage('Packet missing, expecting PUBREL');
  908. continue;
  909. }
  910. // Send PUBCOMP
  911. if($this->sendPubComp($message['packetId']) === false) {
  912. $this->debugMessage('Failed to send PUBCOMP');
  913. continue;
  914. }
  915. }
  916. // Package was successfully confirmed or no confirmation needed/wanted --> store it
  917. $messages[] = $message;
  918. }
  919. return $messages;
  920. }
  921. /**
  922. * Decode a publish packet to its attributes
  923. *
  924. * @param string $packet
  925. * @return array|boolean Return message or false if decode failed
  926. */
  927. public function decodePublish($packet) {
  928. if (!is_string($packet) || (strlen($packet) <= 3)) {
  929. return false;
  930. }
  931. $flags = ord($packet[0]) & 0x0f;
  932. $duplicate = ($flags == 0x80);
  933. $retain = ($flags == 0x01);
  934. $qos = ($flags>>1) & 0x03;
  935. $topicLength = (ord($packet[1])<<8) + ord($packet[2]);
  936. $topic = substr($packet, 3, $topicLength);
  937. $payload = substr($packet, 3 + $topicLength); // Get the payload of the packet
  938. if ($qos == 0) {
  939. // no packet id for QoS 0, the payload is the message
  940. $message = $payload;
  941. $packetId = NULL;
  942. } else {
  943. if (strlen($payload) >= 2) {
  944. $packetId = (ord($payload[0])<<8) + ord($payload[1]);
  945. $message = substr($payload, 2); // skip packet id (2 bytes) for QoS 1 and 2
  946. } else {
  947. // 2 byte packet id required, but not found. exit gracefully (no failure)
  948. $packetId = NULL;
  949. $message = '';
  950. }
  951. }
  952. return [
  953. 'topic' => self::convertActiveMqTopic($topic),
  954. 'message' => $message,
  955. 'retain' => $retain,
  956. 'duplicate' => $duplicate,
  957. 'qos' => $qos,
  958. 'packetId' => $packetId,
  959. ];
  960. }
  961. /**
  962. * Replace ActiveMQ special characters to MQTT-standard
  963. *
  964. * @param string $topic
  965. * @return string
  966. */
  967. private static function convertActiveMqTopic($topic) {
  968. $topic = str_replace(".","/", $topic);
  969. $topic = str_replace("*","+", $topic);
  970. $topic = str_replace(">","#", $topic);
  971. return $topic;
  972. }
  973. /**
  974. * Return a string interpreted as hex and ASCII (between 0x20-0x7f)
  975. * Good for displaying recieved packets
  976. *
  977. * @param string $str
  978. * @return string
  979. */
  980. private function str2hex($str) {
  981. $hex = '';
  982. $ascii = '';
  983. for ($i=0; $i<strlen($str); $i++) {
  984. $char = $str[$i];
  985. if (ord($char) >= 0x20 && ord($char) <= 0x7f) {
  986. $ascii .= $char;
  987. } else {
  988. $ascii .= '.';
  989. }
  990. $hex .= dechex(ord($char)).' ';
  991. }
  992. return $hex . '"'. $ascii .'"';
  993. }
  994. public function dumpQueue() {
  995. foreach ($this->packetQueue as $packet) {
  996. $this->str2hex($packet) . PHP_EOL;
  997. }
  998. }
  999. }