From bb4ead3b49bea01e47f0513eb8011887a2d5f7a6 Mon Sep 17 00:00:00 2001 From: weigang Date: Mon, 28 Sep 2020 17:29:48 +0800 Subject: [PATCH] =?UTF-8?q?SSDB=E5=BC=82=E5=B8=B8=E6=8D=95=E8=8E=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/Commons/Log.php | 44 ++- app/Libs/SSDB.php | 561 ++++++++++++++++++++++++++++ app/Libs/SSDBException.php | 24 ++ app/Libs/SSDBTimeoutException.php | 23 ++ app/Libs/SSDB_Response.php | 37 ++ app/Libs/SimpleSSDB.php | 599 ------------------------------ 6 files changed, 688 insertions(+), 600 deletions(-) create mode 100644 app/Libs/SSDB.php create mode 100644 app/Libs/SSDBException.php create mode 100644 app/Libs/SSDBTimeoutException.php create mode 100644 app/Libs/SSDB_Response.php diff --git a/app/Commons/Log.php b/app/Commons/Log.php index 29a4e84..d99cb34 100644 --- a/app/Commons/Log.php +++ b/app/Commons/Log.php @@ -72,7 +72,49 @@ class Log //var_dump(json_encode($pushDatas) ); }); } - + + public function eventInTask($labels=null,$datas){ + + $client = $this->getClient(); + $kv = []; + foreach ($datas as $key => $value) { + $kv[] = $key."=".$value; + } + $pushLabels = []; + + $event_name = 'event_'.env('APP_ENV'); + if(!empty($labels)) $pushLabels[$event_name] = $labels; + + /* + * data format: + curl -v -H "Content-Type: application/json" -XPOST -s "http://39.96.12.39:3100/loki/api/v1/push" --data-raw \ + '{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1596274538882028800", "fizzbuzz" ] ] }]}' + */ + $ts = $this->getMsecTime() . '000000'; + $datas = implode("&",$kv); + $values = [[$ts,$datas]]; + $app_name = env('APP_NAME').'_'.env('APP_ENV'); + + $pushLabels['app']= $app_name; + $pushDatas = [ + 'streams'=>[ + [ + 'stream'=>$pushLabels, + 'values'=>$values, + ] + ] + ]; + $client->post( + env('LOG_HOST','http://39.96.12.39:3100').'/loki/api/v1/push', + [ + 'headers'=>[ + 'Content-Type'=>'application/json' + ], + 'body' => json_encode($pushDatas) + ] + ); + //var_dump(json_encode($pushDatas) ); + } public function push($datas){ $this->event(null,$datas); diff --git a/app/Libs/SSDB.php b/app/Libs/SSDB.php new file mode 100644 index 0000000..03751d0 --- /dev/null +++ b/app/Libs/SSDB.php @@ -0,0 +1,561 @@ +sock = @stream_socket_client("$host:$port", $errno, $errstr, $timeout_f); + if(!$this->sock){ + $jsonMsg = json_encode(['errno' => $errno, 'errstr' => $errstr, 'host' => $host, 'port' => $port]); + throw new SSDBException($jsonMsg); + } + $timeout_sec = intval($timeout_ms/1000); + $timeout_usec = ($timeout_ms - $timeout_sec * 1000) * 1000; + @stream_set_timeout($this->sock, $timeout_sec, $timeout_usec); + if(function_exists('stream_set_chunk_size')){ + @stream_set_chunk_size($this->sock, 1024 * 1024); + } + } + + function set_timeout($timeout_ms){ + $timeout_sec = intval($timeout_ms/1000); + $timeout_usec = ($timeout_ms - $timeout_sec * 1000) * 1000; + @stream_set_timeout($this->sock, $timeout_sec, $timeout_usec); + } + + /** + * After this method invoked with yesno=true, all requesting methods + * will not return a SSDB_Response object. + * And some certain methods like get/zget will return false + * when response is not ok(not_found, etc) + */ + function easy(){ + $this->_easy = true; + } + + function close(){ + if(!$this->_closed){ + @fclose($this->sock); + $this->_closed = true; + $this->sock = null; + } + } + + function closed(){ + return $this->_closed; + } + + private $batch_mode = false; + private $batch_cmds = array(); + + function batch(){ + $this->batch_mode = true; + $this->batch_cmds = array(); + return $this; + } + + function multi(){ + return $this->batch(); + } + + function exec(){ + $ret = array(); + foreach($this->batch_cmds as $op){ + list($cmd, $params) = $op; + $this->send_req($cmd, $params); + } + foreach($this->batch_cmds as $op){ + list($cmd, $params) = $op; + $resp = $this->recv_resp($cmd, $params); + $resp = $this->check_easy_resp($cmd, $resp); + $ret[] = $resp; + } + $this->batch_mode = false; + $this->batch_cmds = array(); + return $ret; + } + + function request(){ + $args = func_get_args(); + $cmd = array_shift($args); + return $this->__call($cmd, $args); + } + + private $async_auth_password = null; + + function auth($password){ + $this->async_auth_password = $password; + return null; + } + + function __call($cmd, $params=array()){ + $cmd = strtolower($cmd); + if($this->async_auth_password !== null){ + $pass = $this->async_auth_password; + $this->async_auth_password = null; + $auth = $this->__call('auth', array($pass)); + if($auth !== true){ + throw new SSDBException("Authentication failed"); + } + } + + if($this->batch_mode){ + $this->batch_cmds[] = array($cmd, $params); + return $this; + } + + try{ + if($this->send_req($cmd, $params) === false){ + $resp = new SSDB_Response('error', 'send error'); + }else{ + $resp = $this->recv_resp($cmd, $params); + } + }catch(SSDBException $e){ + if($this->_easy){ + throw $e; + }else{ + $resp = new SSDB_Response('error', $e->getMessage()); + } + } + + if($resp->code == 'noauth'){ + $msg = $resp->message; + throw new SSDBException($msg); + } + + $resp = $this->check_easy_resp($cmd, $resp); + return $resp; + } + + private function check_easy_resp($cmd, $resp){ + $this->last_resp = $resp; + if($this->_easy){ + if($resp->not_found()){ + return NULL; + }else if(!$resp->ok() && !is_array($resp->data)){ + return false; + }else{ + return $resp->data; + } + }else{ + $resp->cmd = $cmd; + return $resp; + } + } + + function multi_set($kvs=array()){ + $args = array(); + foreach($kvs as $k=>$v){ + $args[] = $k; + $args[] = $v; + } + return $this->__call(__FUNCTION__, $args); + } + + function multi_hset($name, $kvs=array()){ + $args = array($name); + foreach($kvs as $k=>$v){ + $args[] = $k; + $args[] = $v; + } + return $this->__call(__FUNCTION__, $args); + } + + function multi_zset($name, $kvs=array()){ + $args = array($name); + foreach($kvs as $k=>$v){ + $args[] = $k; + $args[] = $v; + } + return $this->__call(__FUNCTION__, $args); + } + + function incr($key, $val=1){ + $args = func_get_args(); + return $this->__call(__FUNCTION__, $args); + } + + function decr($key, $val=1){ + $args = func_get_args(); + return $this->__call(__FUNCTION__, $args); + } + + function zincr($name, $key, $score=1){ + $args = func_get_args(); + return $this->__call(__FUNCTION__, $args); + } + + function zdecr($name, $key, $score=1){ + $args = func_get_args(); + return $this->__call(__FUNCTION__, $args); + } + + function zadd($key, $score, $value){ + $args = array($key, $value, $score); + return $this->__call('zset', $args); + } + + function zRevRank($name, $key){ + $args = func_get_args(); + return $this->__call("zrrank", $args); + } + + function zRevRange($name, $offset, $limit){ + $args = func_get_args(); + return $this->__call("zrrange", $args); + } + + function hincr($name, $key, $val=1){ + $args = func_get_args(); + return $this->__call(__FUNCTION__, $args); + } + + function hdecr($name, $key, $val=1){ + $args = func_get_args(); + return $this->__call(__FUNCTION__, $args); + } + + private function send_req($cmd, $params){ + $req = array($cmd); + foreach($params as $p){ + if(is_array($p)){ + $req = array_merge($req, $p); + }else{ + $req[] = $p; + } + } + return $this->send($req); + } + + private function recv_resp($cmd, $params){ + $resp = $this->recv(); + if($resp === false){ + return new SSDB_Response('error', 'Unknown error'); + }else if(!$resp){ + return new SSDB_Response('disconnected', 'Connection closed'); + } + if($resp[0] == 'noauth'){ + $errmsg = isset($resp[1])? $resp[1] : ''; + return new SSDB_Response($resp[0], $errmsg); + } + switch($cmd){ + case 'dbsize': + case 'ping': + case 'qset': + case 'getbit': + case 'setbit': + case 'countbit': + case 'strlen': + case 'set': + case 'setx': + case 'setnx': + case 'zset': + case 'hset': + case 'qpush': + case 'qpush_front': + case 'qpush_back': + case 'qtrim_front': + case 'qtrim_back': + case 'del': + case 'zdel': + case 'hdel': + case 'hsize': + case 'zsize': + case 'qsize': + case 'hclear': + case 'zclear': + case 'qclear': + case 'multi_set': + case 'multi_del': + case 'multi_hset': + case 'multi_hdel': + case 'multi_zset': + case 'multi_zdel': + case 'incr': + case 'decr': + case 'zincr': + case 'zdecr': + case 'hincr': + case 'hdecr': + case 'zget': + case 'zrank': + case 'zrrank': + case 'zcount': + case 'zsum': + case 'zremrangebyrank': + case 'zremrangebyscore': + case 'ttl': + case 'expire': + if($resp[0] == 'ok'){ + $val = isset($resp[1])? intval($resp[1]) : 0; + return new SSDB_Response($resp[0], $val); + }else{ + $errmsg = isset($resp[1])? $resp[1] : ''; + return new SSDB_Response($resp[0], $errmsg); + } + case 'zavg': + if($resp[0] == 'ok'){ + $val = isset($resp[1])? floatval($resp[1]) : (float)0; + return new SSDB_Response($resp[0], $val); + }else{ + $errmsg = isset($resp[1])? $resp[1] : ''; + return new SSDB_Response($resp[0], $errmsg); + } + case 'get': + case 'substr': + case 'getset': + case 'hget': + case 'qget': + case 'qfront': + case 'qback': + if($resp[0] == 'ok'){ + if(count($resp) == 2){ + return new SSDB_Response('ok', $resp[1]); + }else{ + return new SSDB_Response('server_error', 'Invalid response'); + } + }else{ + $errmsg = isset($resp[1])? $resp[1] : ''; + return new SSDB_Response($resp[0], $errmsg); + } + break; + case 'qpop': + case 'qpop_front': + case 'qpop_back': + if($resp[0] == 'ok'){ + $size = 1; + if(isset($params[1])){ + $size = intval($params[1]); + } + if($size <= 1){ + if(count($resp) == 2){ + return new SSDB_Response('ok', $resp[1]); + }else{ + return new SSDB_Response('server_error', 'Invalid response'); + } + }else{ + $data = array_slice($resp, 1); + return new SSDB_Response('ok', $data); + } + }else{ + $errmsg = isset($resp[1])? $resp[1] : ''; + return new SSDB_Response($resp[0], $errmsg); + } + break; + case 'keys': + case 'zkeys': + case 'hkeys': + case 'hlist': + case 'zlist': + case 'qslice': + if($resp[0] == 'ok'){ + $data = array(); + if($resp[0] == 'ok'){ + $data = array_slice($resp, 1); + } + return new SSDB_Response($resp[0], $data); + }else{ + $errmsg = isset($resp[1])? $resp[1] : ''; + return new SSDB_Response($resp[0], $errmsg); + } + case 'auth': + case 'exists': + case 'hexists': + case 'zexists': + if($resp[0] == 'ok'){ + if(count($resp) == 2){ + return new SSDB_Response('ok', (bool)$resp[1]); + }else{ + return new SSDB_Response('server_error', 'Invalid response'); + } + }else{ + $errmsg = isset($resp[1])? $resp[1] : ''; + return new SSDB_Response($resp[0], $errmsg); + } + break; + case 'multi_exists': + case 'multi_hexists': + case 'multi_zexists': + if($resp[0] == 'ok'){ + if(count($resp) % 2 == 1){ + $data = array(); + for($i=1; $idebug){ + echo '> ' . str_replace(array("\r", "\n"), array('\r', '\n'), $s) . "\n"; + } + try{ + while(true){ + $ret = @fwrite($this->sock, $s); + if($ret === false || $ret === 0){ + $this->close(); + throw new SSDBException('Connection lost'); + } + $s = substr($s, $ret); + if(strlen($s) == 0){ + break; + } + @fflush($this->sock); + } + }catch(Exception $e){ + $this->close(); + throw new SSDBException($e->getMessage()); + } + return $ret; + } + + function recv(){ + $this->step = self::STEP_SIZE; + while(true){ + $ret = $this->parse(); + if($ret === null){ + try{ + $data = @fread($this->sock, 1024 * 1024); + if($this->debug){ + echo '< ' . str_replace(array("\r", "\n"), array('\r', '\n'), $data) . "\n"; + } + }catch(Exception $e){ + $data = ''; + } + if($data === false || $data === ''){ + if(feof($this->sock)){ + $this->close(); + throw new SSDBException('Connection lost'); + }else{ + throw new SSDBTimeoutException('Connection timeout'); + } + } + $this->recv_buf .= $data; +# echo "read " . strlen($data) . " total: " . strlen($this->recv_buf) . "\n"; + }else{ + return $ret; + } + } + } + + const STEP_SIZE = 0; + const STEP_DATA = 1; + public $resp = array(); + public $step; + public $block_size; + + private function parse(){ + $spos = 0; + $epos = 0; + $buf_size = strlen($this->recv_buf); + // performance issue for large reponse + //$this->recv_buf = ltrim($this->recv_buf); + while(true){ + $spos = $epos; + if($this->step === self::STEP_SIZE){ + $epos = strpos($this->recv_buf, "\n", $spos); + if($epos === false){ + break; + } + $epos += 1; + $line = substr($this->recv_buf, $spos, $epos - $spos); + $spos = $epos; + + $line = trim($line); + if(strlen($line) == 0){ // head end + $this->recv_buf = substr($this->recv_buf, $spos); + $ret = $this->resp; + $this->resp = array(); + return $ret; + } + $this->block_size = intval($line); + $this->step = self::STEP_DATA; + } + if($this->step === self::STEP_DATA){ + $epos = $spos + $this->block_size; + if($epos <= $buf_size){ + $n = strpos($this->recv_buf, "\n", $epos); + if($n !== false){ + $data = substr($this->recv_buf, $spos, $epos - $spos); + $this->resp[] = $data; + $epos = $n + 1; + $this->step = self::STEP_SIZE; + continue; + } + } + break; + } + } + + // packet not ready + if($spos > 0){ + $this->recv_buf = substr($this->recv_buf, $spos); + } + return null; + } +} \ No newline at end of file diff --git a/app/Libs/SSDBException.php b/app/Libs/SSDBException.php new file mode 100644 index 0000000..dcf534f --- /dev/null +++ b/app/Libs/SSDBException.php @@ -0,0 +1,24 @@ +log->eventInTask('ssdb_exception', ['msg' => $message]); + parent::__construct($message, $code, $previous); + } + +} \ No newline at end of file diff --git a/app/Libs/SSDBTimeoutException.php b/app/Libs/SSDBTimeoutException.php new file mode 100644 index 0000000..93609f2 --- /dev/null +++ b/app/Libs/SSDBTimeoutException.php @@ -0,0 +1,23 @@ +log->eventInTask('ssdb_timeout_exception', ['msg' => $message]); + parent::__construct($message, $code, $previous); + } + +} \ No newline at end of file diff --git a/app/Libs/SSDB_Response.php b/app/Libs/SSDB_Response.php new file mode 100644 index 0000000..de49aa4 --- /dev/null +++ b/app/Libs/SSDB_Response.php @@ -0,0 +1,37 @@ +code = $code; + if($code == 'ok'){ + $this->data = $data_or_message; + }else{ + $this->message = $data_or_message; + } + } + + function __toString(){ + if($this->code == 'ok'){ + $s = $this->data === null? '' : json_encode($this->data); + }else{ + $s = $this->message; + } + return sprintf('%-13s %12s %s', $this->cmd, $this->code, $s); + } + + function ok(){ + return $this->code == 'ok'; + } + + function not_found(){ + return $this->code == 'not_found'; + } +} \ No newline at end of file diff --git a/app/Libs/SimpleSSDB.php b/app/Libs/SimpleSSDB.php index e8f6907..d3f987b 100644 --- a/app/Libs/SimpleSSDB.php +++ b/app/Libs/SimpleSSDB.php @@ -9,16 +9,6 @@ */ namespace App\Libs; -use Exception; - - -class SSDBException extends Exception -{ -} - -class SSDBTimeoutException extends SSDBException -{ -} /** * All methods(except *exists) returns false on error, @@ -30,593 +20,4 @@ class SimpleSSDB extends SSDB parent::__construct($host, $port, $timeout_ms); $this->easy(); } -} - -class SSDB_Response -{ - public $cmd; - public $code; - public $data = null; - public $message; - - function __construct($code='ok', $data_or_message=null){ - $this->code = $code; - if($code == 'ok'){ - $this->data = $data_or_message; - }else{ - $this->message = $data_or_message; - } - } - - function __toString(){ - if($this->code == 'ok'){ - $s = $this->data === null? '' : json_encode($this->data); - }else{ - $s = $this->message; - } - return sprintf('%-13s %12s %s', $this->cmd, $this->code, $s); - } - - function ok(){ - return $this->code == 'ok'; - } - - function not_found(){ - return $this->code == 'not_found'; - } -} - -// Depricated, use SimpleSSDB instead! -class SSDB -{ - private $debug = false; - public $sock = null; - private $_closed = false; - private $recv_buf = ''; - private $_easy = false; - public $last_resp = null; - - function __construct($host, $port, $timeout_ms=2000){ - $timeout_f = (float)$timeout_ms/1000; - $this->sock = @stream_socket_client("$host:$port", $errno, $errstr, $timeout_f); - if(!$this->sock){ - throw new SSDBException("$errno: $errstr"); - } - $timeout_sec = intval($timeout_ms/1000); - $timeout_usec = ($timeout_ms - $timeout_sec * 1000) * 1000; - @stream_set_timeout($this->sock, $timeout_sec, $timeout_usec); - if(function_exists('stream_set_chunk_size')){ - @stream_set_chunk_size($this->sock, 1024 * 1024); - } - } - - function set_timeout($timeout_ms){ - $timeout_sec = intval($timeout_ms/1000); - $timeout_usec = ($timeout_ms - $timeout_sec * 1000) * 1000; - @stream_set_timeout($this->sock, $timeout_sec, $timeout_usec); - } - - /** - * After this method invoked with yesno=true, all requesting methods - * will not return a SSDB_Response object. - * And some certain methods like get/zget will return false - * when response is not ok(not_found, etc) - */ - function easy(){ - $this->_easy = true; - } - - function close(){ - if(!$this->_closed){ - @fclose($this->sock); - $this->_closed = true; - $this->sock = null; - } - } - - function closed(){ - return $this->_closed; - } - - private $batch_mode = false; - private $batch_cmds = array(); - - function batch(){ - $this->batch_mode = true; - $this->batch_cmds = array(); - return $this; - } - - function multi(){ - return $this->batch(); - } - - function exec(){ - $ret = array(); - foreach($this->batch_cmds as $op){ - list($cmd, $params) = $op; - $this->send_req($cmd, $params); - } - foreach($this->batch_cmds as $op){ - list($cmd, $params) = $op; - $resp = $this->recv_resp($cmd, $params); - $resp = $this->check_easy_resp($cmd, $resp); - $ret[] = $resp; - } - $this->batch_mode = false; - $this->batch_cmds = array(); - return $ret; - } - - function request(){ - $args = func_get_args(); - $cmd = array_shift($args); - return $this->__call($cmd, $args); - } - - private $async_auth_password = null; - - function auth($password){ - $this->async_auth_password = $password; - return null; - } - - function __call($cmd, $params=array()){ - $cmd = strtolower($cmd); - if($this->async_auth_password !== null){ - $pass = $this->async_auth_password; - $this->async_auth_password = null; - $auth = $this->__call('auth', array($pass)); - if($auth !== true){ - throw new Exception("Authentication failed"); - } - } - - if($this->batch_mode){ - $this->batch_cmds[] = array($cmd, $params); - return $this; - } - - try{ - if($this->send_req($cmd, $params) === false){ - $resp = new SSDB_Response('error', 'send error'); - }else{ - $resp = $this->recv_resp($cmd, $params); - } - }catch(SSDBException $e){ - if($this->_easy){ - throw $e; - }else{ - $resp = new SSDB_Response('error', $e->getMessage()); - } - } - - if($resp->code == 'noauth'){ - $msg = $resp->message; - throw new Exception($msg); - } - - $resp = $this->check_easy_resp($cmd, $resp); - return $resp; - } - - private function check_easy_resp($cmd, $resp){ - $this->last_resp = $resp; - if($this->_easy){ - if($resp->not_found()){ - return NULL; - }else if(!$resp->ok() && !is_array($resp->data)){ - return false; - }else{ - return $resp->data; - } - }else{ - $resp->cmd = $cmd; - return $resp; - } - } - - function multi_set($kvs=array()){ - $args = array(); - foreach($kvs as $k=>$v){ - $args[] = $k; - $args[] = $v; - } - return $this->__call(__FUNCTION__, $args); - } - - function multi_hset($name, $kvs=array()){ - $args = array($name); - foreach($kvs as $k=>$v){ - $args[] = $k; - $args[] = $v; - } - return $this->__call(__FUNCTION__, $args); - } - - function multi_zset($name, $kvs=array()){ - $args = array($name); - foreach($kvs as $k=>$v){ - $args[] = $k; - $args[] = $v; - } - return $this->__call(__FUNCTION__, $args); - } - - function incr($key, $val=1){ - $args = func_get_args(); - return $this->__call(__FUNCTION__, $args); - } - - function decr($key, $val=1){ - $args = func_get_args(); - return $this->__call(__FUNCTION__, $args); - } - - function zincr($name, $key, $score=1){ - $args = func_get_args(); - return $this->__call(__FUNCTION__, $args); - } - - function zdecr($name, $key, $score=1){ - $args = func_get_args(); - return $this->__call(__FUNCTION__, $args); - } - - function zadd($key, $score, $value){ - $args = array($key, $value, $score); - return $this->__call('zset', $args); - } - - function zRevRank($name, $key){ - $args = func_get_args(); - return $this->__call("zrrank", $args); - } - - function zRevRange($name, $offset, $limit){ - $args = func_get_args(); - return $this->__call("zrrange", $args); - } - - function hincr($name, $key, $val=1){ - $args = func_get_args(); - return $this->__call(__FUNCTION__, $args); - } - - function hdecr($name, $key, $val=1){ - $args = func_get_args(); - return $this->__call(__FUNCTION__, $args); - } - - private function send_req($cmd, $params){ - $req = array($cmd); - foreach($params as $p){ - if(is_array($p)){ - $req = array_merge($req, $p); - }else{ - $req[] = $p; - } - } - return $this->send($req); - } - - private function recv_resp($cmd, $params){ - $resp = $this->recv(); - if($resp === false){ - return new SSDB_Response('error', 'Unknown error'); - }else if(!$resp){ - return new SSDB_Response('disconnected', 'Connection closed'); - } - if($resp[0] == 'noauth'){ - $errmsg = isset($resp[1])? $resp[1] : ''; - return new SSDB_Response($resp[0], $errmsg); - } - switch($cmd){ - case 'dbsize': - case 'ping': - case 'qset': - case 'getbit': - case 'setbit': - case 'countbit': - case 'strlen': - case 'set': - case 'setx': - case 'setnx': - case 'zset': - case 'hset': - case 'qpush': - case 'qpush_front': - case 'qpush_back': - case 'qtrim_front': - case 'qtrim_back': - case 'del': - case 'zdel': - case 'hdel': - case 'hsize': - case 'zsize': - case 'qsize': - case 'hclear': - case 'zclear': - case 'qclear': - case 'multi_set': - case 'multi_del': - case 'multi_hset': - case 'multi_hdel': - case 'multi_zset': - case 'multi_zdel': - case 'incr': - case 'decr': - case 'zincr': - case 'zdecr': - case 'hincr': - case 'hdecr': - case 'zget': - case 'zrank': - case 'zrrank': - case 'zcount': - case 'zsum': - case 'zremrangebyrank': - case 'zremrangebyscore': - case 'ttl': - case 'expire': - if($resp[0] == 'ok'){ - $val = isset($resp[1])? intval($resp[1]) : 0; - return new SSDB_Response($resp[0], $val); - }else{ - $errmsg = isset($resp[1])? $resp[1] : ''; - return new SSDB_Response($resp[0], $errmsg); - } - case 'zavg': - if($resp[0] == 'ok'){ - $val = isset($resp[1])? floatval($resp[1]) : (float)0; - return new SSDB_Response($resp[0], $val); - }else{ - $errmsg = isset($resp[1])? $resp[1] : ''; - return new SSDB_Response($resp[0], $errmsg); - } - case 'get': - case 'substr': - case 'getset': - case 'hget': - case 'qget': - case 'qfront': - case 'qback': - if($resp[0] == 'ok'){ - if(count($resp) == 2){ - return new SSDB_Response('ok', $resp[1]); - }else{ - return new SSDB_Response('server_error', 'Invalid response'); - } - }else{ - $errmsg = isset($resp[1])? $resp[1] : ''; - return new SSDB_Response($resp[0], $errmsg); - } - break; - case 'qpop': - case 'qpop_front': - case 'qpop_back': - if($resp[0] == 'ok'){ - $size = 1; - if(isset($params[1])){ - $size = intval($params[1]); - } - if($size <= 1){ - if(count($resp) == 2){ - return new SSDB_Response('ok', $resp[1]); - }else{ - return new SSDB_Response('server_error', 'Invalid response'); - } - }else{ - $data = array_slice($resp, 1); - return new SSDB_Response('ok', $data); - } - }else{ - $errmsg = isset($resp[1])? $resp[1] : ''; - return new SSDB_Response($resp[0], $errmsg); - } - break; - case 'keys': - case 'zkeys': - case 'hkeys': - case 'hlist': - case 'zlist': - case 'qslice': - if($resp[0] == 'ok'){ - $data = array(); - if($resp[0] == 'ok'){ - $data = array_slice($resp, 1); - } - return new SSDB_Response($resp[0], $data); - }else{ - $errmsg = isset($resp[1])? $resp[1] : ''; - return new SSDB_Response($resp[0], $errmsg); - } - case 'auth': - case 'exists': - case 'hexists': - case 'zexists': - if($resp[0] == 'ok'){ - if(count($resp) == 2){ - return new SSDB_Response('ok', (bool)$resp[1]); - }else{ - return new SSDB_Response('server_error', 'Invalid response'); - } - }else{ - $errmsg = isset($resp[1])? $resp[1] : ''; - return new SSDB_Response($resp[0], $errmsg); - } - break; - case 'multi_exists': - case 'multi_hexists': - case 'multi_zexists': - if($resp[0] == 'ok'){ - if(count($resp) % 2 == 1){ - $data = array(); - for($i=1; $idebug){ - echo '> ' . str_replace(array("\r", "\n"), array('\r', '\n'), $s) . "\n"; - } - try{ - while(true){ - $ret = @fwrite($this->sock, $s); - if($ret === false || $ret === 0){ - $this->close(); - throw new SSDBException('Connection lost'); - } - $s = substr($s, $ret); - if(strlen($s) == 0){ - break; - } - @fflush($this->sock); - } - }catch(Exception $e){ - $this->close(); - throw new SSDBException($e->getMessage()); - } - return $ret; - } - - function recv(){ - $this->step = self::STEP_SIZE; - while(true){ - $ret = $this->parse(); - if($ret === null){ - try{ - $data = @fread($this->sock, 1024 * 1024); - if($this->debug){ - echo '< ' . str_replace(array("\r", "\n"), array('\r', '\n'), $data) . "\n"; - } - }catch(Exception $e){ - $data = ''; - } - if($data === false || $data === ''){ - if(feof($this->sock)){ - $this->close(); - throw new SSDBException('Connection lost'); - }else{ - throw new SSDBTimeoutException('Connection timeout'); - } - } - $this->recv_buf .= $data; -# echo "read " . strlen($data) . " total: " . strlen($this->recv_buf) . "\n"; - }else{ - return $ret; - } - } - } - - const STEP_SIZE = 0; - const STEP_DATA = 1; - public $resp = array(); - public $step; - public $block_size; - - private function parse(){ - $spos = 0; - $epos = 0; - $buf_size = strlen($this->recv_buf); - // performance issue for large reponse - //$this->recv_buf = ltrim($this->recv_buf); - while(true){ - $spos = $epos; - if($this->step === self::STEP_SIZE){ - $epos = strpos($this->recv_buf, "\n", $spos); - if($epos === false){ - break; - } - $epos += 1; - $line = substr($this->recv_buf, $spos, $epos - $spos); - $spos = $epos; - - $line = trim($line); - if(strlen($line) == 0){ // head end - $this->recv_buf = substr($this->recv_buf, $spos); - $ret = $this->resp; - $this->resp = array(); - return $ret; - } - $this->block_size = intval($line); - $this->step = self::STEP_DATA; - } - if($this->step === self::STEP_DATA){ - $epos = $spos + $this->block_size; - if($epos <= $buf_size){ - $n = strpos($this->recv_buf, "\n", $epos); - if($n !== false){ - $data = substr($this->recv_buf, $spos, $epos - $spos); - $this->resp[] = $data; - $epos = $n + 1; - $this->step = self::STEP_SIZE; - continue; - } - } - break; - } - } - - // packet not ready - if($spos > 0){ - $this->recv_buf = substr($this->recv_buf, $spos); - } - return null; - } } \ No newline at end of file