support new protocol

This commit is contained in:
Evgeny Zinoviev 2021-03-01 02:03:07 +03:00
parent c66fc2f691
commit 7b44cbe272
6 changed files with 266 additions and 35 deletions

View File

@ -9,11 +9,14 @@ class Client {
const MASTER_PORT = 7081;
const EOT = "\4";
const REQUEST_NO_LIMIT = 999999;
protected $host;
protected $port;
protected $password;
protected $sock;
protected $passwordSent = false;
protected $lastOutgoingRequestNo = null;
/**
* JobdClient constructor.
@ -30,6 +33,9 @@ class Client {
$this->sock = fsockopen($this->host, $this->port);
if (!$this->sock)
throw new \Exception("Failed to connect to {$this->host}:{$this->port}");
// 0 is reserved
$this->lastOutgoingRequestNo = mt_rand(1, self::REQUEST_NO_LIMIT);
}
/**
@ -37,7 +43,7 @@ class Client {
* @throws \Exception
*/
public function ping() {
$this->send(new RequestMessage('ping'));
$this->send(new PingMessage());
return $this->recv();
}
@ -47,8 +53,9 @@ class Client {
* @throws \Exception
*/
public function poke(array $targets) {
$this->send(new RequestMessage('poke', ['targets' => $targets]));
return $this->recv();
return $this->recv(
$this->sendRequest(new RequestMessage('poke', ['targets' => $targets]))
);
}
/**
@ -56,8 +63,9 @@ class Client {
* @throws \Exception
*/
public function status() {
$this->send(new RequestMessage('status'));
return $this->recv();
return $this->recv(
$this->sendRequest(new RequestMessage('status'))
);
}
/**
@ -66,8 +74,9 @@ class Client {
* @throws \Exception
*/
public function poll(array $targets) {
$this->send(new RequestMessage('poll', ['targets' => $targets]));
return $this->recv();
return $this->recv(
$this->sendRequest(new RequestMessage('poll', ['targets' => $targets]))
);
}
/**
@ -75,27 +84,43 @@ class Client {
* @return ResponseMessage
*/
public function runManual(int $id) {
$this->send(new RequestMessage('run-manual', ['id' => $id]));
return $this->recv();
return $this->recv(
$this->sendRequest(new RequestMessage('run-manual', ['id' => $id]))
);
}
/**
* @param RequestMessage $request
* @return int
*/
public function send(RequestMessage $request) {
if ($this->password)
public function sendRequest(RequestMessage $request) {
if ($this->password && !$this->passwordSent) {
$request->setPassword($this->password);
$this->passwordSent = true;
}
$serialized = $request->serialize();
$no = $this->getNextOutgoingRequestNo();
$request->setRequestNo($no);
$this->send($request);
return $no;
}
/**
* @param Message $message
*/
protected function send(Message $message) {
$serialized = $message->serialize();
fwrite($this->sock, $serialized . self::EOT);
}
/**
* @param int $request_no
* @return ResponseMessage
* @throws \Exception
*/
public function recv() {
public function recv(int $request_no = -1) {
$messages = [];
$buf = '';
while (!feof($this->sock)) {
@ -117,22 +142,73 @@ class Client {
}
} while ($eot_pos !== false && $offset < $buflen-1);
if (empty($message))
if (empty($messages))
throw new \Exception("Malformed response: no messages found. Response: {$buf}");
if (count($messages) > 1)
trigger_error(__METHOD__.": received more than one message");
$response = self::parseMessage($messages[0]);
if (!($response instanceof ResponseMessage))
throw new \Exception('Unexpected message type');
$response = null;
$messages = array_map('self::parseMessage', $messages);
if ($request_no != -1) {
/**
* @var ResponseMessage[] $messages
*/
$messages = array_filter(
$messages,
if ($error = $response->getError())
throw new \Exception('jobd error: '.$response->getError());
/**
* @param ResponseMessage|RequestMessage $message
*/
function(Message $message) use ($request_no) {
return $message instanceof ResponseMessage
&& ($message->getRequestNo() === $request_no || $message->getRequestNo() === 0);
}
);
if (empty($messages))
throw new \Exception("Malformed response: response for {$request_no} not found.");
if (count($messages) == 2) {
// weird, we caught response for our $request_no AND a message with reserved zero no
// but anyway
for ($i = 0; $i < count($messages); $i++) {
$message = $messages[$i];
if ($message->getRequestNo() == $request_no)
$response = $message;
else if ($message->getRequestNo() == 0)
trigger_error(__METHOD__.': received an error with reqno=0: '.($message->getError() ?? null));
}
}
}
if (is_null($response))
$response = $messages[0];
if ($response instanceof ResponseMessage) {
if ($error = $response->getError())
throw new \Exception($response->getError());
}
return $response;
}
/**
* @return int
*/
protected function getNextOutgoingRequestNo() {
$this->lastOutgoingRequestNo++;
if ($this->lastOutgoingRequestNo >= self::REQUEST_NO_LIMIT)
$this->lastOutgoingRequestNo = 1; // 0 is reserved
return $this->lastOutgoingRequestNo;
}
/**
* @param string $raw_string
* @return RequestMessage|ResponseMessage
@ -140,34 +216,117 @@ class Client {
*/
protected static function parseMessage(string $raw_string) {
$raw = json_decode($raw_string, true);
if (!is_array($raw) || count($raw) != 2)
if (!is_array($raw) || count($raw) < 1)
throw new \Exception("Malformed message: {$raw_string}");
list($type, $data) = $raw;
list($type) = $raw;
switch ($type) {
case Message::REQUEST:
if (!$data || !is_array($data) || !isset($data['type']) || !is_string($data['type']))
throw new \Exception('Malformed REQUEST message');
$data = $raw[1];
try {
self::validateData($data, [
// name type required
['type', 's', true],
['no', 'i', true],
['password', 's', false],
['data', 'aifs', false]
]);
} catch (\Exception $e) {
throw new \Exception("Malformed REQUEST message: {$e->getMessage()}");
}
$message = new RequestMessage($data['type'], $data['data'] ?? null);
$message = new RequestMessage($data['no'], $data['type'], $data['data'] ?? null);
if (isset($data['password']))
$message->setPassword($data['password']);
return $message;
case Message::RESPONSE:
if (!is_array($data) || count($data) < 2)
throw new \Exception('Malformed RESPONSE message');
$data = $raw[1];
try {
self::validateData($data, [
// name type required
['no', 'i', true],
['data', 'aifs', false],
['error', 's', false],
]);
} catch (\Exception $e) {
throw new \Exception("Malformed RESPONSE message: {$e->getMessage()}");
}
$message = new ResponseMessage(...$data);
return $message;
return new ResponseMessage($data['no'], $data['error'] ?? null, $data['data'] ?? null);
case Message::PING:
return new PingMessage();
break;
case Message::PONG:
return new PongMessage();
break;
default:
throw new \Exception("Malformed message: unexpected type {$type}");
}
}
/**
* @param mixed $data
* @param array $schema
* @return bool
*/
protected static function validateData($data, array $schema) {
if (!$data || !is_array($data))
throw new \Exception('data must be array');
foreach ($schema as $schema_item) {
list ($key_name, $key_types, $key_required) = $schema_item;
if (!isset($data[$key_name])) {
if ($key_required)
throw new \Exception("'{$key_name}' is missing");
continue;
}
$passed = false;
for ($i = 0; $i < strlen($key_types); $i++) {
$type = $key_types[$i];
switch ($type) {
case 'i':
if (is_int($data[$key_name]))
$passed = true;
break;
case 'f':
if (is_float($data[$key_name]))
$passed = true;
break;
case 's':
if (is_string($data[$key_name]))
$passed = true;
break;
case 'a':
if (is_array($data[$key_name]))
$passed = true;
break;
default:
trigger_error(__METHOD__.': unexpected type '.$type);
break;
}
if ($passed)
break;
}
if (!$passed)
throw new \Exception("{$key_name}: required type is '{$key_types}'");
}
}
/**
* @return bool
*/

View File

@ -7,6 +7,8 @@ abstract class Message {
const REQUEST = 0;
const RESPONSE = 1;
const PING = 2;
const PONG = 3;
protected $type;
@ -27,10 +29,13 @@ abstract class Message {
* @return string
*/
public function serialize(): string {
return json_encode([
$this->type,
$this->getContent()
]);
$data = [$this->type];
$content = $this->getContent();
if (!empty($content))
$data[] = $content;
return json_encode($data);
}
}

15
src/PingMessage.php Normal file
View File

@ -0,0 +1,15 @@
<?php
namespace jobd;
class PingMessage extends Message {
public function __construct() {
parent::__construct(Message::PING);
}
protected function getContent(): array {
return [];
}
}

15
src/PongMessage.php Normal file
View File

@ -0,0 +1,15 @@
<?php
namespace jobd;
class PongMessage extends Message {
public function __construct() {
parent::__construct(Message::PING);
}
protected function getContent(): array {
return [];
}
}

View File

@ -4,6 +4,7 @@ namespace jobd;
class RequestMessage extends Message {
protected $requestNo;
protected $requestType;
protected $requestData;
protected $password;
@ -27,13 +28,28 @@ class RequestMessage extends Message {
$this->password = $password;
}
/**
* @param int $no
*/
public function setRequestNo(int $no) {
$this->requestNo = $no;
}
/**
* @return string[]
*/
protected function getContent(): array {
$request = ['type' => $this->requestType];
$request = [
'type' => $this->requestType,
'no' => $this->requestNo,
];
if (!is_null($this->requestData))
$request['data'] = $this->requestData;
if (!is_null($this->password))
$request['password'] = $this->password;
return $request;
}

View File

@ -4,17 +4,21 @@ namespace jobd;
class ResponseMessage extends Message {
protected $requestNo;
protected $error;
protected $data;
/**
* Response constructor.
*
* @param int $requestNo
* @param null $error
* @param null $data
*/
public function __construct($error = null, $data = null) {
public function __construct($request_no, $error = null, $data = null) {
parent::__construct(Message::RESPONSE);
$this->requestNo = $request_no;
$this->error = $error;
$this->data = $data;
}
@ -23,7 +27,17 @@ class ResponseMessage extends Message {
* @return array
*/
protected function getContent(): array {
return [$this->error, $this->data];
$response = [
'no' => $this->requestNo
];
if (!is_null($this->error))
$response['error'] = $this->error;
if (!is_null(!$this->data))
$response['data'] = $this->data;
return $response;
}
/**
@ -40,4 +54,11 @@ class ResponseMessage extends Message {
return $this->data;
}
/**
* @return int
*/
public function getRequestNo() {
return $this->requestNo;
}
}