You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1101 lines
38 KiB

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Libs;
  4. class MQTTClient {
  5. // MQTT control packet types (here left shifted 4 bits)
  6. const MQTT_CONNECT = 0x10; // Client request to connect to Server
  7. const MQTT_CONNACK = 0x20; // Connect acknowledgment
  8. const MQTT_PUBLISH = 0x30; // Publish message
  9. const MQTT_PUBACK = 0x40; // Publish acknowledgment
  10. const MQTT_PUBREC = 0x50; // Publish received (assured delivery part 1)
  11. const MQTT_PUBREL = 0x62; // Publish release (assured delivery part 2)
  12. const MQTT_PUBCOMP = 0x70; // Publish complete (assured delivery part 3)
  13. const MQTT_SUBSCRIBE = 0x80; // Client subscribe request
  14. const MQTT_SUBACK = 0x90; // Subscribe acknowledgment
  15. const MQTT_UNSUBSCRIBE = 0xa0; // Unsubscribe request
  16. const MQTT_UNSUBACK = 0xb0; // Unsubscribe acknowledgment
  17. const MQTT_PINGREQ = 0xc0; // PING request
  18. const MQTT_PINGRESP = 0xd0; // PING response
  19. const MQTT_DISCONNECT = 0xe0; // Client is disconnecting
  20. // MQTT quality of service levels
  21. const MQTT_QOS0 = 0x00;
  22. const MQTT_QOS1 = 0x01;
  23. const MQTT_QOS2 = 0x02;
  24. // MQTT status on last read from stream
  25. const READ_STATUS_NO_READ = 0;
  26. const READ_STATUS_OK = 200;
  27. const READ_STATUS_ERROR = 400;
  28. const READ_STATUS_ERROR_HEADER = 401;
  29. const READ_STATUS_ERROR_PACKETLENGTH = 402;
  30. const READ_STATUS_ERROR_PAYLOAD = 403;
  31. private $socket = null; // Socket resource reference
  32. private $socketTimeout; // Default socket timeout in milliseconds
  33. private $socketReadDelay = 1000; // Delay in milliseconds between read attempts on socket
  34. private $protocol = 'tcp';
  35. private $serverAddress;
  36. private $serverPort;
  37. private $clientId;
  38. public $caFile = null;
  39. public $localCert = null;
  40. public $localPrivateKey = null;
  41. private $connectCleanSession;
  42. private $connectKeepAlive;
  43. private $connectWill = false;
  44. private $connectWillQos;
  45. private $connectWillRetain;
  46. private $connectUsername;
  47. private $connectPassword;
  48. private $willTopic;
  49. private $willMessage;
  50. private $packetId; // Packet identifier to validate return packets
  51. private $pingReqTime; // Time when last PINGREQ was sent
  52. private $serverAliveTime; // Time for last response from server
  53. private $debug = false;
  54. private $lastReadStatus = self::READ_STATUS_NO_READ;
  55. private $packetQueue = []; // Queue for received but unhandled packages
  56. public $lastConnectResult = 0;
  57. /**
  58. * Class constructor - Sets up connection parameters
  59. *
  60. * @param string $address Address to server
  61. * @param string $port Port to server
  62. * @param string $protocol Which protocol to use
  63. */
  64. function __construct($address, $port=null, $protocol='tcp'){
  65. if ($this->setConnection($address, $port, $protocol)) {
  66. ;
  67. }
  68. $this->packetId = rand(1,100)*100; // Reduce risk of creating duplicate ids in sequential sessions
  69. }
  70. /**
  71. * Class destructor - Close socket
  72. */
  73. function __destruct(){
  74. $this->close();
  75. }
  76. /**
  77. * Setup conection parameters
  78. *
  79. * @param string $address
  80. * @param string $port
  81. * @param string $protocol
  82. * @return boolean If return false then using default parameters where validation failed
  83. */
  84. function setConnection($address, $port=null, $protocol='tcp'){
  85. $this->serverAddress = $address;
  86. $this->serverPort = $port;
  87. // Validate protocol
  88. $protocol = strtolower($protocol);
  89. if (($protocol != 'tcp') && !self::isEncryptionProtocol($protocol)) {
  90. $this->debugMessage('Invalid protocol ('.$protocol.'). Setting to default (tcp).');
  91. $this->protocol = 'tcp';
  92. return false;
  93. }
  94. $this->protocol = $protocol;
  95. return true;
  96. }
  97. /**
  98. * Build url for connecting to stream
  99. *
  100. * @return string
  101. */
  102. private function getUrl() {
  103. $url = '';
  104. if ($this->protocol) $url .= $this->protocol .'://';
  105. $url .= $this->serverAddress;
  106. if ($this->serverPort) $url .= ':'. $this->serverPort;
  107. return $url;
  108. }
  109. /**
  110. * Check if encryption protocol is supported
  111. *
  112. * @param string $protcol
  113. * @return boolean
  114. */
  115. private static function isEncryptionProtocol($protocol) {
  116. return in_array(strtolower($protocol), ['ssl', 'tls', 'tlsv1.0', 'tlsv1.1', 'tlsv1.2', 'sslv3']);
  117. }
  118. /**
  119. * Sets server certificate and protocol for ssl/tls encryption
  120. *
  121. * @param string $caFile CA file to identify server
  122. * @param string $protocl Crypto protocol (See http://php.net/manual/en/migration56.openssl.php)
  123. * @return boolean False if settings failed, else true
  124. */
  125. public function setEncryption($caFile, $protocol = null) {
  126. if (file_exists($caFile)) {
  127. $this->caFile = $caFile;
  128. } else {
  129. $this->debugMessage('CA file not found');
  130. return false;
  131. }
  132. if(self::isEncryptionProtocol($protocol)) {
  133. $this->protocol = $protocol;
  134. } else if (!is_null($protocol)) {
  135. $this->debugMessage('Unknown encryption protocol');
  136. return false;
  137. }
  138. return true;
  139. }
  140. /**
  141. * Sets client crt and key files for client-side authentication
  142. *
  143. * @param string $crtFile Client certificate file
  144. * @param string $keyFile Client key file
  145. * @return boolean False if settings failed, else true
  146. */
  147. public function setClientEncryption($certificateFile, $keyFile) {
  148. if (!file_exists($certificateFile)) {
  149. $this->debugMessage('Client certificate file not found');
  150. return false;
  151. }
  152. if (!file_exists($keyFile)) {
  153. $this->debugMessage('Client key file not found');
  154. return false;
  155. }
  156. $this->localCert= $certificateFile;
  157. $this->localPrivateKey = $keyFile;
  158. return true;
  159. }
  160. /**
  161. * Set authentication details to be used when connecting
  162. *
  163. * @param string $username Username
  164. * @param string $password Password
  165. */
  166. public function setAuthentication($username, $password) {
  167. $this->connectUsername= $username;
  168. $this->connectPassword = $password;
  169. }
  170. /**
  171. * Set will (last message defined by MQTT) to send when connection is lost
  172. *
  173. * @param string $topic
  174. * @param string $message
  175. * @param integer $qos
  176. * @param boolean $retain
  177. */
  178. public function setWill($topic, $message, $qos=1, $retain=false) {
  179. $this->connectWill = true;
  180. $this->connectWillQos = $qos;
  181. $this->connectWillRetain = $retain;
  182. $this->willTopic = $topic;
  183. $this->willMessage = $message;
  184. }
  185. /**
  186. * Connect to MQTT server
  187. *
  188. * @param string $clientId Unique id used by the server to identify the client
  189. * @param boolean $cleanSession Set true to clear session on server, ie queued messages are purged (not recieved)
  190. * @param integer $keepAlive Number of seconds a connection is considered to be alive without traffic
  191. * @param integer $timeout Number of millliseconds before timeout when reading from socket
  192. * @return boolean Returns false if connection failed
  193. */
  194. public function sendConnect($clientId, $cleanSession=false, $keepAlive=10, $timeout=5000) {
  195. var_dump('serverAddress', $this->serverAddress);
  196. if (!$this->serverAddress) return false;
  197. // Basic validation of clientid
  198. // Note: A MQTT server may accept other chars and more than 23 chars in the clientid but that is optional,
  199. // all chars below up to 23 chars are required to be accepted (see section "3.1.3.1 Client Identifier" of the standard)
  200. if(preg_match("/[^0-9a-zA-Z]/",$clientId)) {
  201. $this->debugMessage('ClientId can only contain characters 0-9,a-z,A-Z');
  202. return false;
  203. }
  204. if(strlen($clientId) > 23) {
  205. $this->debugMessage('ClientId max length is 23 characters/numbers');
  206. return false;
  207. }
  208. $this->clientId = $clientId;
  209. $this->connectCleanSession = $cleanSession;
  210. $this->connectKeepAlive = $keepAlive;
  211. $this->socketTimeout = $timeout;
  212. // Setup certificates if encryption protocol selected
  213. if ($this->isEncryptionProtocol($this->protocol)) {
  214. $mozillaCiphers = implode(':', array(
  215. 'ECDHE-RSA-AES128-GCM-SHA256',
  216. 'ECDHE-ECDSA-AES128-GCM-SHA256',
  217. 'ECDHE-RSA-AES256-GCM-SHA384',
  218. 'ECDHE-ECDSA-AES256-GCM-SHA384',
  219. 'DHE-RSA-AES128-GCM-SHA256',
  220. 'DHE-DSS-AES128-GCM-SHA256',
  221. 'kEDH+AESGCM',
  222. 'ECDHE-RSA-AES128-SHA256',
  223. 'ECDHE-ECDSA-AES128-SHA256',
  224. 'ECDHE-RSA-AES128-SHA',
  225. 'ECDHE-ECDSA-AES128-SHA',
  226. 'ECDHE-RSA-AES256-SHA384',
  227. 'ECDHE-ECDSA-AES256-SHA384',
  228. 'ECDHE-RSA-AES256-SHA',
  229. 'ECDHE-ECDSA-AES256-SHA',
  230. 'DHE-RSA-AES128-SHA256',
  231. 'DHE-RSA-AES128-SHA',
  232. 'DHE-DSS-AES128-SHA256',
  233. 'DHE-RSA-AES256-SHA256',
  234. 'DHE-DSS-AES256-SHA',
  235. 'DHE-RSA-AES256-SHA',
  236. 'AES128-GCM-SHA256',
  237. 'AES256-GCM-SHA384',
  238. 'ECDHE-RSA-RC4-SHA',
  239. 'ECDHE-ECDSA-RC4-SHA',
  240. 'AES128',
  241. 'AES256',
  242. 'RC4-SHA',
  243. 'HIGH',
  244. '!aNULL',
  245. '!eNULL',
  246. '!EXPORT',
  247. '!DES',
  248. '!3DES',
  249. '!MD5',
  250. '!PSK'
  251. ));
  252. // Secure socket communication with these parameters, a ca-file is required
  253. $options = [];
  254. $options['verify_peer'] = true;
  255. $options['verify_peer_name'] = true;
  256. $options['verify_depth'] = 5;
  257. $options['disable_compression'] = true;
  258. $options['SNI_enabled'] = true;
  259. $options['ciphers'] = $mozillaCiphers;
  260. if($this->caFile) {
  261. $options['cafile'] = $this->caFile;
  262. }
  263. if($this->localCert) {
  264. $options['local_cert'] = $this->localCert;
  265. if ($this->localPrivateKey) {
  266. $options['local_pk'] = $this->localPrivateKey;
  267. }
  268. }
  269. $socketContext = stream_context_create(['ssl' => $options]);
  270. $this->debugMessage('Settings socket options: '. var_export($options, true));
  271. } else {
  272. $socketContext = null;
  273. }
  274. // Try to open socket
  275. try {
  276. $this->debugMessage('Opening socket to: '. $this->getUrl());
  277. if ($socketContext) {
  278. $this->socket = stream_socket_client($this->getUrl(), $errno, $errstr, 10, STREAM_CLIENT_CONNECT, $socketContext);
  279. } else {
  280. $this->socket = stream_socket_client($this->getUrl(), $errno, $errstr, 10, STREAM_CLIENT_CONNECT);
  281. }
  282. } catch (\ErrorException $error) {
  283. $this->debugMessage('Exception: Could not open stream with error message: '. $error->getMessage());
  284. $this->socket = null;
  285. return false;
  286. }
  287. // Check if socket was opened successfully
  288. if ($this->socket === false) {
  289. $this->socket = null;
  290. $this->debugMessage('Connection failed. Error-no:'. $errno .' Error message: '. $errstr);
  291. return false;
  292. }
  293. // Set socket timeout
  294. ini_set('default_socket_timeout', '10');
  295. stream_set_timeout($this->socket, 0, $this->socketTimeout * 1000);
  296. // Set stream to non-blocking mode, ie do not wait to read if stream is empty
  297. stream_set_blocking($this->socket, true);
  298. // Calculate connect flags to use in CONNECT header
  299. $connectFlags = 0;
  300. if ($this->connectCleanSession) $connectFlags += 0x02;
  301. if ($this->connectWill) {
  302. $connectFlags += 0x04;
  303. if ($this->connectWillQos) $connectFlags += ($this->connectWill << 3);
  304. if ($this->connectWillRetain) $connectFlags += 0x20;
  305. }
  306. if ($this->connectUsername) {
  307. $connectFlags += 0x80;
  308. if ($this->connectPassword) $connectFlags += 0x40;
  309. }
  310. // Build payload and header for CONNECT-packet
  311. $payload = chr(0x00).chr(0x04); // MSB & LSB length of MQTT = 4
  312. $payload .= 'MQTT';
  313. $payload .= chr(0x04); // Protocol level (3.1.1)
  314. $payload .= chr($connectFlags); // Connect flags
  315. $payload .= chr($this->connectKeepAlive >> 8); // Keepalive (MSB)
  316. $payload .= chr($this->connectKeepAlive & 0xff); // Keepalive (LSB)
  317. if ($this->connectCleanSession && empty($this->clientId)) {
  318. $this->clientId = rand(1,999999999);
  319. }
  320. if ($this->clientId) {
  321. $payload .= $this->createPayload($this->clientId);
  322. }
  323. if($this->connectWill){
  324. $payload .= $this->createPayload($this->willTopic);
  325. $payload .= $this->createPayload($this->willMessage);
  326. }
  327. if($this->connectUsername) {
  328. $payload .= $this->createPayload($this->connectUsername);
  329. }
  330. if ($this->connectPassword) {
  331. $payload .= $this->createPayload($this->connectPassword);
  332. }
  333. $header = $this->createHeader(self::MQTT_CONNECT, $payload);
  334. $this->debugMessage('Sending CONNECT');
  335. $this->send($header . $payload);
  336. // Wait for CONNACK packet
  337. $response = $this->waitForPacket(self::MQTT_CONNACK);
  338. if($response !== false && ($response[2] == chr(0))) {
  339. $this->debugMessage('Connected to MQTT');
  340. $this->lastConnectResult = 0;
  341. return true;
  342. } else {
  343. $this->debugMessage('Connection failed! Error: '. ord($response[2]));
  344. $this->lastConnectResult = ord($response[2]);
  345. $this->close();
  346. return false;
  347. }
  348. }
  349. /**
  350. * Publish a topic and message (QoS 0,1,2 supported)
  351. *
  352. * @param string $topic
  353. * @param string $message
  354. * @param byte $qos
  355. * @return boolean
  356. */
  357. public function sendPublish($topic, $message, $qos = self::MQTT_QOS1, $retain = 0) {
  358. var_dump('connect:', $this->isConnected());
  359. if(!$this->isConnected()) return false;
  360. var_dump('sendpublish', [$topic, $message, $qos]);
  361. if($qos!=self::MQTT_QOS0 && $qos!=self::MQTT_QOS1 && $qos!=self::MQTT_QOS2) return false;
  362. $packetId = $this->getNextPacketId();
  363. $payload = $this->createPayload($topic);
  364. if($qos >= self::MQTT_QOS1) {
  365. // Packet identifier required for QoS level >= 1
  366. $payload .= $this->getPacketIdPayload();
  367. }
  368. $payload .= $message;
  369. $dupFlag = 0;
  370. $header = $this->createHeader(self::MQTT_PUBLISH + ($dupFlag<<3) + ($qos<<1) + $retain, $payload);
  371. $this->debugMessage('Sending PUBLISH');
  372. var_dump('sendpublish-$payload', $header.$payload);
  373. $this->send($header . $payload);
  374. if($qos == self::MQTT_QOS1) {
  375. // If QoS level 1, only a PUBACK packet is expected
  376. $response = $this->waitForPacket(self::MQTT_PUBACK, $packetId);
  377. if($response === false) {
  378. $this->debugMessage('Packet missing, expecting PUBACK');
  379. return false;
  380. }
  381. } elseif($qos == self::MQTT_QOS2) {
  382. // If QoS level 2, a PUBREC packet is expected
  383. $response = $this->waitForPacket(self::MQTT_PUBREC, $packetId);
  384. if($response === false) {
  385. $this->debugMessage('Packet missing, expecting PUBREC');
  386. return false;
  387. }
  388. // Send PUBREL
  389. $response = $this->sendPubRel($packetId);
  390. if($response === false) {
  391. $this->debugMessage('Failed to send PUBREL');
  392. return false;
  393. }
  394. // A PUBCOMP packet is expected
  395. $response = $this->waitForPacket(self::MQTT_PUBCOMP, $packetId);
  396. if($response === false) {
  397. $this->debugMessage('Packet missing, expecting PUBCOMP');
  398. return false;
  399. }
  400. }
  401. return true;
  402. }
  403. /**
  404. * Send PUBACK as response to a recieved PUBLISH packet (QoS Level 1)
  405. *
  406. * @param integer $packetId Packet identifier of PUBLISH packet
  407. * @return boolean Returns true if packet sent successfully
  408. */
  409. public function sendPubAck($packetId) {
  410. if(!$this->isConnected()) return false;
  411. $payload = chr(($packetId & 0xff00)>>8) . chr($packetId & 0xff);
  412. $header = $this->createHeader(self::MQTT_PUBACK, $payload);
  413. $this->debugMessage('Sending PUBACK');
  414. $this->send($header . $payload);
  415. return true;
  416. }
  417. /**
  418. * Send PUBREC as response to a recieved PUBLISH packet (QoS Level 2)
  419. *
  420. * @param integer $packetId Packet identifier of PUBLISH packet
  421. * @return boolean Returns true if packet sent successfully
  422. */
  423. public function sendPubRec($packetId) {
  424. if(!$this->isConnected()) return false;
  425. $payload = chr(($packetId & 0xff00)>>8) . chr($packetId & 0xff);
  426. $header = $this->createHeader(self::MQTT_PUBREC, $payload);
  427. $this->debugMessage('Sending PUBREC');
  428. $this->send($header . $payload);
  429. return true;
  430. }
  431. /**
  432. * Send PUBREL as response to a recieved PUBREC packet (QoS Level 2)
  433. *
  434. * @param integer $packetId Packet identifier of PUBLISH packet
  435. * @return boolean Returns true if packet sent successfully
  436. */
  437. public function sendPubRel($packetId) {
  438. if(!$this->isConnected()) return false;
  439. $payload = chr(($packetId & 0xff00)>>8) . chr($packetId & 0xff);
  440. $header = $this->createHeader(self::MQTT_PUBREL, $payload);
  441. $this->debugMessage('Sending PUBREL');
  442. $this->send($header . $payload);
  443. return true;
  444. }
  445. /**
  446. * Send PUBCOMP as response to a recieved PUBREL packet (QoS Level 2)
  447. *
  448. * @param integer $packetId Packet identifier of PUBLISH packet
  449. * @return boolean Returns true if packet sent successfully
  450. */
  451. public function sendPubComp($packetId) {
  452. if(!$this->isConnected()) return false;
  453. $payload = chr(($packetId & 0xff00)>>8) . chr($packetId & 0xff);
  454. $header = $this->createHeader(self::MQTT_PUBCOMP, $payload);
  455. $this->debugMessage('Sending PUBCOMP');
  456. $this->send($header . $payload);
  457. return true;
  458. }
  459. /**
  460. * Subscribe to topics with a quality of service
  461. *
  462. * @param string[] $topics Topics to subscribe for
  463. * @param integer $qos Quality of serivce for all topics
  464. * @return boolean Returns true if SUBACK was recieved
  465. */
  466. public function sendSubscribe($topics, $qos = self::MQTT_QOS1) {
  467. if (!is_array($topics)) $topics = [$topics];
  468. if(!$this->isConnected()) return false;
  469. $packetId = $this->getNextPacketId();
  470. $payload = $this->getPacketIdPayload();
  471. foreach($topics as $topic) {
  472. $payload .= $this->createPayload($topic);
  473. $payload .= chr($qos);
  474. }
  475. $header = $this->createHeader(self::MQTT_SUBSCRIBE + 0x02, $payload);
  476. $this->debugMessage('Sending SUBSCRIBE');
  477. $this->send($header . $payload);
  478. // A SUBACK packet is expected
  479. $response = $this->waitForPacket(self::MQTT_SUBACK, $packetId);
  480. if($response === false) {
  481. $this->debugMessage('Packet missing, expecting SUBACK');
  482. return false;
  483. }
  484. $responsePayload = substr($response, 3); // skip header and identifier (3 bytes)
  485. if (strlen($responsePayload) != count($topics)) {
  486. $this->debugMessage('Did not recieve SUBACK for all topics');
  487. return false;
  488. }
  489. // Check which subscriptions that were approved
  490. $topicsResult = [];
  491. $i = 0;
  492. foreach ($topics as $topic) {
  493. $topicsResult[$topic] = [];
  494. if ($responsePayload[$i] > 0x02) {
  495. $topicsResult[$topic]['success'] = false;
  496. $topicsResult[$topic]['qosGiven'] = null;
  497. } else {
  498. $topicsResult[$topic]['success'] = true;
  499. $topicsResult[$topic]['qosGiven'] = (int) ord($responsePayload[$i]);
  500. }
  501. $i++;
  502. }
  503. return $topicsResult;
  504. }
  505. /**
  506. * Send unsubscribe packet for given topics
  507. *
  508. * @param string[] $topics
  509. * @return boolean Returns true if UNSUBACK was recieved
  510. */
  511. public function sendUnsubscribe($topics) {
  512. if(!$this->isConnected()) return false;
  513. $packetId = $this->getNextPacketId();
  514. $payload = $this->getPacketIdPayload();
  515. foreach($topics as $topic) {
  516. $payload .= $this->createPayload($topic);
  517. }
  518. $header = $this->createHeader(self::MQTT_UNSUBSCRIBE + 0x02, $payload);
  519. $this->debugMessage('Sending UNSUBSCRIBE');
  520. $this->send($header . $payload);
  521. // An UNSUBACK packet is expected
  522. $response = $this->waitForPacket(self::MQTT_UNSUBACK, $packetId);
  523. if($response === false) {
  524. $this->debugMessage('Invalid packet received, expecting UNSUBACK');
  525. return false;
  526. }
  527. return true;
  528. }
  529. /**
  530. * Sends PINGREQ packet to server
  531. *
  532. * @return boolean Returns true if PINGRESP was recieved
  533. */
  534. public function sendPing() {
  535. if(!$this->isConnected()) return false;
  536. $this->timeSincePingReq = time();
  537. $header = $this->createHeader(self::MQTT_PINGREQ);
  538. $this->debugMessage('Sending PING');
  539. $this->send($header);
  540. $this->pingReqTime = time();
  541. // A PINGRESP packet is expected
  542. $response = $this->waitForPacket(self::MQTT_PINGRESP);
  543. if($response === false) {
  544. $this->debugMessage('Invalid packet received, expecting PINGRESP');
  545. return false;
  546. }
  547. return true;
  548. }
  549. /**
  550. * Send disconnect and close socket
  551. */
  552. public function sendDisconnect() {
  553. if($this->isConnected()) {
  554. $header = $this->createHeader(self::MQTT_DISCONNECT);
  555. $this->debugMessage('Sending DISCONNECT');
  556. $this->send($header);
  557. $this->close();
  558. }
  559. }
  560. /**
  561. * Close socket
  562. */
  563. public function close() {
  564. if($this->isConnected()) {
  565. $this->debugMessage('Closing socket');
  566. stream_socket_shutdown($this->socket, STREAM_SHUT_RDWR);
  567. $this->socket = null;
  568. $this->serverAliveTime = null;
  569. }
  570. }
  571. /**
  572. * Check if connected to stream
  573. * @return boolean
  574. */
  575. public function isConnected() {
  576. return !empty($this->socket);
  577. }
  578. /**
  579. * Check if connection is alive
  580. * @return boolean
  581. */
  582. public function isAlive() {
  583. return $this->isConnected() && ($this->serverAliveTime + $this->connectKeepAlive <= time());
  584. }
  585. /**
  586. * Set debug mode, if true then output progress messages
  587. *
  588. * @param boolean $mode
  589. */
  590. public function setDebug($mode = true) {
  591. $this->debug = $mode;
  592. }
  593. /**
  594. * Print message to console if debug mode is on
  595. *
  596. * @param string $message
  597. */
  598. private function debugMessage($message) {
  599. if ($this->debug) {
  600. echo 'MQTT: '. $message .PHP_EOL;
  601. }
  602. }
  603. /**
  604. * Return next packet identifier to use in MQTT packet
  605. * Max 2 bytes to be used, restart on 0 if end reached
  606. *
  607. * @return integer
  608. */
  609. private function getNextPacketId() {
  610. return ($this->packetId = ($this->packetId + 1) & 0xffff);
  611. }
  612. /**
  613. * Return payload of packet id, use latest generated packet id as default
  614. *
  615. * @param integer $packetId
  616. * @return string Two chars with apyload to add in MQTT-message
  617. */
  618. private function getPacketIdPayload($packetId = null) {
  619. if (empty($packetId)) $packetId = $this->packetId;
  620. return chr(($packetId & 0xff00)>>8) . chr($packetId & 0xff);
  621. }
  622. /**
  623. * Add payload length as bytes to begining of string and return
  624. *
  625. * @param string $payload
  626. * @return string
  627. */
  628. private function createPayload($payload) {
  629. $fullLength = strlen($payload);
  630. $retval = chr($fullLength>>8).chr($fullLength&0xff).$payload;
  631. return $retval;
  632. }
  633. /**
  634. * Decode payload using inital length (2 bytes) and return as string array
  635. *
  636. * @param string $payload
  637. * @return string[]
  638. */
  639. private function decodePayload($payload) {
  640. $result = [];
  641. while (strlen($payload) >= 2) {
  642. $length = ord($payload[0])<<8 + ord($payload[1]);
  643. if (strlen($payload) <= $length + 2) {
  644. $result[] = substr($payload, 2, $length);
  645. }
  646. $payload = substr($payload, min($length + 2, strlen($payload)));
  647. }
  648. return $result;
  649. }
  650. /**
  651. * Send data to open socket
  652. *
  653. * @param string $data
  654. * @return boolean Only returns true if all data was sent
  655. */
  656. private function send($data) {
  657. if ($this->socket) {
  658. $result = fwrite($this->socket, $data);
  659. if (($result !== false) && ($result == strlen($data))) {
  660. $this->serverAliveTime = time();
  661. return true;
  662. }
  663. }
  664. return false;
  665. }
  666. /**
  667. * Read bytes from socket until x bytes read, eof reached or socket timeout
  668. *
  669. * @param int $bytes Number of bytes to read
  670. * @return string Return bytes read as a string
  671. */
  672. private function readBytes($bytes) {
  673. if (!$this->socket) return false;
  674. //if ($bytes == 0) return '';
  675. $bytesLeft = $bytes;
  676. $result = '';
  677. do {
  678. // If stream at end, close down socket and exit
  679. if(feof($this->socket)) {
  680. $this->debugMessage('Reached EOF for stream');
  681. $this->close();
  682. return $result;
  683. }
  684. // Try to read from stream
  685. $str = fread($this->socket, $bytesLeft);
  686. if ($str !== false && strlen($str) > 0) {
  687. $result .= $str;
  688. $bytesLeft -= strlen($str);
  689. }
  690. if ($bytesLeft <= 0) {
  691. // If all bytes read, then return them
  692. $this->serverAliveTime = time();
  693. return $result;
  694. }
  695. // Check if timeout
  696. $info = stream_get_meta_data($this->socket);
  697. if ($info['timed_out']) {
  698. $this->debugMessage('Read timeout');
  699. return false;
  700. }
  701. // Wait a while before trying to read again (in micro seconds)
  702. usleep($this->socketReadDelay * 1000);
  703. } while (true);
  704. }
  705. /**
  706. * Encode length to bytes to send in stream
  707. *
  708. * @param integer $len
  709. * @return string
  710. */
  711. private function encodeLength($len) {
  712. if ($len < 0 || $len >= 128*128*128*128) {
  713. // illegal length
  714. return false;
  715. }
  716. $output = '';
  717. do {
  718. $byte = $len & 0x7f; // keep lowest 7 bits
  719. $len = $len >> 7; // shift away lowest 7 bits
  720. if ($len > 0) {
  721. $byte = $byte | 0x80; // set high bit to indicate continuation
  722. }
  723. $output .= chr($byte);
  724. } while ($len > 0);
  725. return $output;
  726. }
  727. /**
  728. * Return length of packet by reading from stream
  729. *
  730. * @return integer
  731. */
  732. private function readPacketLength() {
  733. $bytesRead = 0;
  734. $len = 0;
  735. $multiplier = 1;
  736. do {
  737. if ($bytesRead > 4) {
  738. return false; // Malformed length
  739. }
  740. $str = $this->readBytes(1);
  741. if ($str === false || strlen($str) != 1) {
  742. return false; // Unexpected end of stream
  743. }
  744. $byte = ord($str[0]);
  745. $len += ($byte & 0x7f) * $multiplier;
  746. $isContinued = ($byte & 0x80);
  747. if ($isContinued) {
  748. $multiplier *= 128;
  749. }
  750. $bytesRead++;
  751. } while ($isContinued);
  752. return $len;
  753. }
  754. /**
  755. * Create MQTT header from command and payload
  756. *
  757. * @param int $command Command to send
  758. * @param string $payload Payload to be sent
  759. *
  760. * @return string Header to send
  761. */
  762. private function createHeader($command, $payload = '') {
  763. return chr($command) . $this->encodeLength(strlen($payload));
  764. }
  765. /**
  766. * Read next packet from stream
  767. *
  768. * @return boolean
  769. */
  770. private function readNextPacket() {
  771. do {
  772. $header = $this->readBytes(1);
  773. if ($header === false) {
  774. $this->lastReadStatus = self::READ_STATUS_ERROR_HEADER;
  775. return false;
  776. }
  777. } while ((ord($header)&0xf0) == 0); // 0 is illegal control code to start with
  778. $packetLength = $this->readPacketLength();
  779. if ($packetLength === false) {
  780. $this->debugMessage('Could not decode packet length');
  781. $this->lastReadStatus = self::READ_STATUS_ERROR_PACKETLENGTH;
  782. return false;
  783. }
  784. $payload = $packetLength > 0 ? $this->readBytes($packetLength) : '';
  785. if ($payload === false) {
  786. $this->lastReadStatus = self::READ_STATUS_ERROR_PAYLOAD;
  787. return false;
  788. }
  789. $this->debugMessage('Packet response: '. self::str2hex($header . $payload));
  790. $this->lastReadStatus = self::READ_STATUS_OK;
  791. return $header . $payload;
  792. }
  793. public function getLastReadStatus() {
  794. return $this->lastReadStatus;
  795. }
  796. public function hasMoreToRead() {
  797. return ($this->lastReadStatus == self::READ_STATUS_OK) && $this->isConnected();
  798. }
  799. /**
  800. * Read packets from stream and save to queue. Quit after x packets or timeout.
  801. *
  802. * @param integer $maxPackets Packet id the message must match
  803. * @return integer Number of packets read
  804. */
  805. private function readPackets($maxPackets = 100) {
  806. $receivedPackets = 0;
  807. while (($receivedPackets < $maxPackets) && ($packet = $this->readNextPacket()) !== false) {
  808. $this->packetQueue[] = $packet;
  809. $receivedPackets++;
  810. }
  811. return $receivedPackets;
  812. }
  813. /**
  814. * Wait until a certain packet is found in the stream.
  815. * Save other recieved packets in queue.
  816. *
  817. * @param byte $header Header to look for (only 4 high bits) 0xf0
  818. * @param integer $verifyPacketId Packet id the message must match
  819. * @return boolean
  820. */
  821. private function waitForPacket($header, $verifyPacketId = false) {
  822. // first check unhandled packets
  823. foreach ($this->packetQueue as $key => $packet) {
  824. if ($this->isPacketVerified($packet, $header, $verifyPacketId)) {
  825. // if found, remove from queue and return packet
  826. unset($this->packetQueue[$key]);
  827. return $packet;
  828. }
  829. }
  830. // if not found in queue, start reading from stream until found or timeout
  831. do {
  832. $packet = $this->readNextPacket();
  833. if ($packet === false || empty($packet)) return false;
  834. if ($this->isPacketVerified($packet, $header, $verifyPacketId)) {
  835. return $packet;
  836. }
  837. // another packet found, save it to queue
  838. $this->packetQueue[] = $packet;
  839. } while(true);
  840. }
  841. /**
  842. * Check if packet is of a given type and packet id match latest sent packet id
  843. *
  844. * @param string $packet
  845. * @param char $header
  846. * @param integer $verifyPacketId
  847. * @return boolean
  848. */
  849. private function isPacketVerified($packet, $header, $verifyPacketId = false) {
  850. if (is_string($packet) && strlen($packet) >= 1) {
  851. if ((int)(ord($packet[0])&0xf0) == (int)($header&0xf0)) {
  852. if ($verifyPacketId === false) return true;
  853. if (strlen($packet) >= 3) {
  854. $receivedPacketId = (int)(ord($packet[1])<<8) + ord($packet[2]);
  855. if($verifyPacketId == $receivedPacketId) {
  856. return true;
  857. }
  858. }
  859. }
  860. }
  861. return false;
  862. }
  863. /**
  864. * Get packets matching a header from the queue and remove from queue
  865. *
  866. * @param char $header
  867. * @return string[]
  868. */
  869. public function getQueuePackets($header) {
  870. $foundPackets = [];
  871. foreach ($this->packetQueue as $key => $packet) {
  872. if ($this->isPacketVerified($packet, $header)) {
  873. $foundPackets[] = $packet;
  874. unset($this->packetQueue[$key]);
  875. }
  876. }
  877. return $foundPackets;
  878. }
  879. /**
  880. * Get PUBLISH packets and return them as messages
  881. *
  882. * @param integer $maxMessages Max messages to read
  883. * @param boolean $sendPubAck If true, then send PUBACK to MQTT-server (QoS 1)
  884. * @param boolean $sendPubRec If true, then send PUBREC to MQTT-server, wait for PUBREL and send PUBCOMP (QoS 2)
  885. * @return string[] All PUBLISH messages which were confirmed or no confirmation needed/wanted
  886. */
  887. public function getPublishMessages($maxMessages = 100, $sendPubAck = false, $sendPubRec = false) {
  888. $packetsRead = $this->readPackets($maxMessages);
  889. $packets = $this->getQueuePackets(self::MQTT_PUBLISH);
  890. $messages = [];
  891. foreach ($packets as $key => $packet) {
  892. $message = $this->decodePublish($packet);
  893. if ($message === false) {
  894. $this->debugMessage('Message could not be decoded');
  895. continue;
  896. }
  897. if ($sendPubAck && ($message['qos'] == self::MQTT_QOS1)) {
  898. if($this->sendPubAck($message['packetId']) === false) {
  899. $this->debugMessage('Failed to send PUBACK');
  900. continue;
  901. }
  902. } elseif ($sendPubRec && ($message['qos'] == self::MQTT_QOS2)) {
  903. // Send PUBREC
  904. if($this->sendPubRec($message['packetId']) === false) {
  905. $this->debugMessage('Failed to send PUBREC');
  906. continue;
  907. }
  908. // A PUBREL packet is expected
  909. $response = $this->waitForPacket(self::MQTT_PUBREL, $message['packetId']);
  910. if($response === false) {
  911. $this->debugMessage('Packet missing, expecting PUBREL');
  912. continue;
  913. }
  914. // Send PUBCOMP
  915. if($this->sendPubComp($message['packetId']) === false) {
  916. $this->debugMessage('Failed to send PUBCOMP');
  917. continue;
  918. }
  919. }
  920. // Package was successfully confirmed or no confirmation needed/wanted --> store it
  921. $messages[] = $message;
  922. }
  923. return $messages;
  924. }
  925. /**
  926. * Decode a publish packet to its attributes
  927. *
  928. * @param string $packet
  929. * @return array|boolean Return message or false if decode failed
  930. */
  931. public function decodePublish($packet) {
  932. if (!is_string($packet) || (strlen($packet) <= 3)) {
  933. return false;
  934. }
  935. $flags = ord($packet[0]) & 0x0f;
  936. $duplicate = ($flags == 0x80);
  937. $retain = ($flags == 0x01);
  938. $qos = ($flags>>1) & 0x03;
  939. $topicLength = (ord($packet[1])<<8) + ord($packet[2]);
  940. $topic = substr($packet, 3, $topicLength);
  941. $payload = substr($packet, 3 + $topicLength); // Get the payload of the packet
  942. if ($qos == 0) {
  943. // no packet id for QoS 0, the payload is the message
  944. $message = $payload;
  945. $packetId = NULL;
  946. } else {
  947. if (strlen($payload) >= 2) {
  948. $packetId = (ord($payload[0])<<8) + ord($payload[1]);
  949. $message = substr($payload, 2); // skip packet id (2 bytes) for QoS 1 and 2
  950. } else {
  951. // 2 byte packet id required, but not found. exit gracefully (no failure)
  952. $packetId = NULL;
  953. $message = '';
  954. }
  955. }
  956. return [
  957. 'topic' => self::convertActiveMqTopic($topic),
  958. 'message' => $message,
  959. 'retain' => $retain,
  960. 'duplicate' => $duplicate,
  961. 'qos' => $qos,
  962. 'packetId' => $packetId,
  963. ];
  964. }
  965. /**
  966. * Replace ActiveMQ special characters to MQTT-standard
  967. *
  968. * @param string $topic
  969. * @return string
  970. */
  971. private static function convertActiveMqTopic($topic) {
  972. $topic = str_replace(".","/", $topic);
  973. $topic = str_replace("*","+", $topic);
  974. $topic = str_replace(">","#", $topic);
  975. return $topic;
  976. }
  977. /**
  978. * Return a string interpreted as hex and ASCII (between 0x20-0x7f)
  979. * Good for displaying recieved packets
  980. *
  981. * @param string $str
  982. * @return string
  983. */
  984. private function str2hex($str) {
  985. $hex = '';
  986. $ascii = '';
  987. for ($i=0; $i<strlen($str); $i++) {
  988. $char = $str[$i];
  989. if (ord($char) >= 0x20 && ord($char) <= 0x7f) {
  990. $ascii .= $char;
  991. } else {
  992. $ascii .= '.';
  993. }
  994. $hex .= dechex(ord($char)).' ';
  995. }
  996. return $hex . '"'. $ascii .'"';
  997. }
  998. public function dumpQueue() {
  999. foreach ($this->packetQueue as $packet) {
  1000. $this->str2hex($packet) . PHP_EOL;
  1001. }
  1002. }
  1003. }