1.6.0: support signals

This commit is contained in:
Evgeny Zinoviev 2023-04-13 02:14:53 +03:00
parent 5272c5f541
commit d0fa8d6f63
15 changed files with 136 additions and 45 deletions

1
.gitignore vendored
View File

@ -1 +1,2 @@
.idea .idea
/vendor

View File

@ -19,7 +19,7 @@ Here's a small example.
```php ```php
try { try {
$jobd = new jobd\MasterClient(); $jobd = new jobd\MasterClient();
} catch (jobd\Exception $e) { } catch (\jobd\exceptions\JobdException $e) {
die("Failed to connect.\n"); die("Failed to connect.\n");
} }
@ -29,7 +29,7 @@ try {
// get status from master // get status from master
$status = $jobd->status()->getData(); $status = $jobd->status()->getData();
} catch (jobd\Exception $e) { } catch (\jobd\exceptions\JobdException $e) {
die('jobd error: '.$e->getMessage()."\n"); die('jobd error: '.$e->getMessage()."\n");
} }

View File

@ -1,6 +1,6 @@
{ {
"name": "ch1p/jobd-client", "name": "ch1p/jobd-client",
"version": "1.5.2", "version": "1.6.0",
"license": "BSD-2-Clause", "license": "BSD-2-Clause",
"keywords": ["queue", "job", "jobd"], "keywords": ["queue", "job", "jobd"],
"repositories": [ "repositories": [
@ -11,7 +11,9 @@
], ],
"autoload": { "autoload": {
"psr-4": { "psr-4": {
"jobd\\": "src/" "jobd\\": "src/",
"jobd\\messages\\": "src/messages",
"jobd\\exceptions\\": "src/exceptions"
} }
}, },
"require": { "require": {

22
composer.lock generated Normal file
View File

@ -0,0 +1,22 @@
{
"_readme": [
"This file locks the dependencies of your project to a known state",
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically"
],
"content-hash": "3ea2178ffa7d2dde56d1c2ac872d7c34",
"packages": [],
"packages-dev": [],
"aliases": [],
"minimum-stability": "stable",
"stability-flags": [],
"prefer-stable": false,
"prefer-lowest": false,
"platform": {
"php": ">=7.0",
"ext-json": "*",
"ext-sockets": "*"
},
"platform-dev": [],
"plugin-api-version": "2.3.0"
}

View File

@ -2,8 +2,14 @@
namespace jobd; namespace jobd;
use jobd\exceptions\JobdException;
use jobd\messages\Message;
use jobd\messages\PingMessage;
use jobd\messages\PongMessage;
use jobd\messages\RequestMessage;
use jobd\messages\ResponseMessage;
class Client { abstract class Client {
const WORKER_PORT = 7080; const WORKER_PORT = 7080;
const MASTER_PORT = 7081; const MASTER_PORT = 7081;
@ -23,7 +29,7 @@ class Client {
* @param int $port * @param int $port
* @param string $host * @param string $host
* @param string $password * @param string $password
* @throws Exception * @throws JobdException
*/ */
public function __construct(int $port, string $host = '127.0.0.1', string $password = '') public function __construct(int $port, string $host = '127.0.0.1', string $password = '')
{ {
@ -32,12 +38,12 @@ class Client {
$this->password = $password; $this->password = $password;
if (($socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP)) === false) if (($socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP)) === false)
throw new Exception("socket_create() failed: ".$this->getSocketError()); throw new JobdException("socket_create() failed: ".$this->getSocketError());
$this->sock = $socket; $this->sock = $socket;
if ((socket_connect($socket, $host, $port)) === false) if ((socket_connect($socket, $host, $port)) === false)
throw new Exception("socket_connect() failed: ".$this->getSocketError()); throw new JobdException("socket_connect() failed: ".$this->getSocketError());
$this->lastOutgoingRequestNo = mt_rand(1 /* 0 is reserved */, self::REQUEST_NO_LIMIT); $this->lastOutgoingRequestNo = mt_rand(1 /* 0 is reserved */, self::REQUEST_NO_LIMIT);
} }
@ -53,7 +59,7 @@ class Client {
/** /**
* @param string[] $targets * @param string[] $targets
* @return ResponseMessage * @return ResponseMessage
* @throws Exception * @throws JobdException
*/ */
public function pause(array $targets = []): ResponseMessage public function pause(array $targets = []): ResponseMessage
{ {
@ -69,7 +75,7 @@ class Client {
/** /**
* @param string[] $targets * @param string[] $targets
* @return ResponseMessage * @return ResponseMessage
* @throws Exception * @throws JobdException
*/ */
public function continue(array $targets = []): ResponseMessage public function continue(array $targets = []): ResponseMessage
{ {
@ -84,7 +90,7 @@ class Client {
/** /**
* @return PongMessage * @return PongMessage
* @throws Exception * @throws JobdException
*/ */
public function ping(): PongMessage public function ping(): PongMessage
{ {
@ -113,7 +119,7 @@ class Client {
/** /**
* @param Message $message * @param Message $message
* @throws Exception * @throws JobdException
*/ */
public function send(Message $message) public function send(Message $message)
{ {
@ -123,7 +129,7 @@ class Client {
while ($remained > 0) { while ($remained > 0) {
$result = socket_write($this->sock, $data); $result = socket_write($this->sock, $data);
if ($result === false) if ($result === false)
throw new Exception(__METHOD__ . ": socket_write() failed: ".$this->getSocketError()); throw new JobdException(__METHOD__ . ": socket_write() failed: ".$this->getSocketError());
$remained -= $result; $remained -= $result;
if ($remained > 0) if ($remained > 0)
@ -134,7 +140,7 @@ class Client {
/** /**
* @param int $request_no * @param int $request_no
* @return RequestMessage|ResponseMessage|PingMessage|PongMessage * @return RequestMessage|ResponseMessage|PingMessage|PongMessage
* @throws Exception * @throws JobdException
*/ */
public function recv(int $request_no = -1) public function recv(int $request_no = -1)
{ {
@ -145,7 +151,7 @@ class Client {
while (true) { while (true) {
$result = socket_recv($this->sock, $recv_buf, 1024, 0); $result = socket_recv($this->sock, $recv_buf, 1024, 0);
if ($result === false) if ($result === false)
throw new Exception(__METHOD__ . ": socket_recv() failed: " . $this->getSocketError()); throw new JobdException(__METHOD__ . ": socket_recv() failed: " . $this->getSocketError());
// peer disconnected // peer disconnected
if ($result === 0) if ($result === 0)
@ -172,7 +178,7 @@ class Client {
} while ($eot_pos !== false && $offset < $buflen-1); } while ($eot_pos !== false && $offset < $buflen-1);
if (empty($messages)) if (empty($messages))
throw new Exception("Malformed response: no messages found. Response: {$buf}"); throw new JobdException("Malformed response: no messages found. Response: {$buf}");
if (count($messages) > 1) if (count($messages) > 1)
trigger_error(__METHOD__.": received more than one message"); trigger_error(__METHOD__.": received more than one message");
@ -196,7 +202,7 @@ class Client {
); );
if (empty($messages)) if (empty($messages))
throw new Exception("Malformed response: response for {$request_no} not found."); throw new JobdException("Malformed response: response for {$request_no} not found.");
if (count($messages) == 2) { if (count($messages) == 2) {
@ -220,7 +226,7 @@ class Client {
if ($response instanceof ResponseMessage) { if ($response instanceof ResponseMessage) {
if ($error = $response->getError()) if ($error = $response->getError())
throw new Exception($response->getError()); throw new JobdException($response->getError());
} }
return $response; return $response;
@ -242,13 +248,13 @@ class Client {
/** /**
* @param string $raw_string * @param string $raw_string
* @return RequestMessage|ResponseMessage|PingMessage|PongMessage * @return RequestMessage|ResponseMessage|PingMessage|PongMessage
* @throws Exception * @throws JobdException
*/ */
protected static function parseMessage(string $raw_string) protected static function parseMessage(string $raw_string)
{ {
$raw = json_decode($raw_string, true); $raw = json_decode($raw_string, true);
if (!is_array($raw) || count($raw) < 1) if (!is_array($raw) || count($raw) < 1)
throw new Exception("Malformed message: {$raw_string}"); throw new JobdException("Malformed message: {$raw_string}");
list($type) = $raw; list($type) = $raw;
@ -263,8 +269,8 @@ class Client {
['password', 's', false], ['password', 's', false],
['data', 'a', false] ['data', 'a', false]
]); ]);
} catch (Exception $e) { } catch (JobdException $e) {
throw new Exception("Malformed REQUEST message: {$e->getMessage()}"); throw new JobdException("Malformed REQUEST message: {$e->getMessage()}");
} }
$message = new RequestMessage($data['type'], $data['data'] ?? null); $message = new RequestMessage($data['type'], $data['data'] ?? null);
@ -283,8 +289,8 @@ class Client {
['data', 'aifs', false], ['data', 'aifs', false],
['error', 's', false], ['error', 's', false],
]); ]);
} catch (Exception $e) { } catch (JobdException $e) {
throw new Exception("Malformed RESPONSE message: {$e->getMessage()}"); throw new JobdException("Malformed RESPONSE message: {$e->getMessage()}");
} }
return new ResponseMessage($data['no'], $data['error'] ?? null, $data['data'] ?? null); return new ResponseMessage($data['no'], $data['error'] ?? null, $data['data'] ?? null);
@ -296,25 +302,25 @@ class Client {
return new PongMessage(); return new PongMessage();
default: default:
throw new Exception("Malformed message: unexpected type {$type}"); throw new JobdException("Malformed message: unexpected type {$type}");
} }
} }
/** /**
* @param mixed $data * @param mixed $data
* @param array $schema * @param array $schema
* @throws Exception * @throws JobdException
*/ */
protected static function validateData($data, array $schema) protected static function validateData($data, array $schema)
{ {
if (!$data || !is_array($data)) if (!$data || !is_array($data))
throw new Exception('data must be array'); throw new JobdException('data must be array');
foreach ($schema as $schema_item) { foreach ($schema as $schema_item) {
list ($key_name, $key_types, $key_required) = $schema_item; list ($key_name, $key_types, $key_required) = $schema_item;
if (!isset($data[$key_name])) { if (!isset($data[$key_name])) {
if ($key_required) if ($key_required)
throw new Exception("'{$key_name}' is missing"); throw new JobdException("'{$key_name}' is missing");
continue; continue;
} }
@ -354,7 +360,7 @@ class Client {
} }
if (!$passed) if (!$passed)
throw new Exception("{$key_name}: required type is '{$key_types}'"); throw new JobdException("{$key_name}: required type is '{$key_types}'");
} }
} }

View File

@ -1,5 +0,0 @@
<?php
namespace jobd;
class Exception extends \Exception {}

View File

@ -2,6 +2,10 @@
namespace jobd; namespace jobd;
use jobd\exceptions\JobdException;
use jobd\messages\RequestMessage;
use jobd\messages\ResponseMessage;
class MasterClient extends Client { class MasterClient extends Client {
public function __construct(int $port = Client::MASTER_PORT, ...$args) public function __construct(int $port = Client::MASTER_PORT, ...$args)
@ -12,7 +16,7 @@ class MasterClient extends Client {
/** /**
* @param array $targets * @param array $targets
* @return ResponseMessage * @return ResponseMessage
* @throws Exception * @throws JobdException
*/ */
public function poke(array $targets): ResponseMessage public function poke(array $targets): ResponseMessage
{ {
@ -24,7 +28,7 @@ class MasterClient extends Client {
/** /**
* @param bool $poll_workers * @param bool $poll_workers
* @return ResponseMessage * @return ResponseMessage
* @throws Exception * @throws JobdException
*/ */
public function status(bool $poll_workers = false): ResponseMessage public function status(bool $poll_workers = false): ResponseMessage
{ {
@ -36,7 +40,7 @@ class MasterClient extends Client {
/** /**
* @param array[] $jobs * @param array[] $jobs
* @return ResponseMessage * @return ResponseMessage
* @throws Exception * @throws JobdException
*/ */
public function runManual(array $jobs): ResponseMessage public function runManual(array $jobs): ResponseMessage
{ {
@ -45,4 +49,33 @@ class MasterClient extends Client {
); );
} }
/**
* @param int $job_id
* @param int $signal
* @param string $target
* @return ResponseMessage
* @throws JobdException
*/
public function sendSignal(int $job_id, int $signal, string $target): ResponseMessage
{
return $this->sendSignals([
[
'id' => $job_id,
'signal' => $signal,
'target' => $target
]
]);
}
/**
* @param array $data
* @return ResponseMessage
* @throws JobdException
*/
public function sendSignals(array $data): ResponseMessage {
return $this->recv(
$this->sendRequest(new RequestMessage('send-signal', ['jobs' => $data]))
);
}
} }

View File

@ -2,6 +2,10 @@
namespace jobd; namespace jobd;
use jobd\exceptions\JobdException;
use jobd\messages\RequestMessage;
use jobd\messages\ResponseMessage;
class WorkerClient extends Client { class WorkerClient extends Client {
public function __construct(int $port = Client::WORKER_PORT, ...$args) public function __construct(int $port = Client::WORKER_PORT, ...$args)
@ -48,11 +52,23 @@ class WorkerClient extends Client {
); );
} }
/**
* @param array[] $jobs
* @return ResponseMessage
* @throws JobdException
*/
public function sendSignal(array $jobs): ResponseMessage
{
return $this->recv(
$this->sendRequest(new RequestMessage('send-signal', ['jobs' => $jobs]))
);
}
/** /**
* @param string $target * @param string $target
* @param int $concurrency * @param int $concurrency
* @return ResponseMessage * @return ResponseMessage
* @throws Exception * @throws JobdException
*/ */
public function addTarget(string $target, int $concurrency): ResponseMessage public function addTarget(string $target, int $concurrency): ResponseMessage
{ {
@ -67,7 +83,7 @@ class WorkerClient extends Client {
/** /**
* @param string $target * @param string $target
* @return ResponseMessage * @return ResponseMessage
* @throws Exception * @throws JobdException
*/ */
public function removeTarget(string $target): ResponseMessage public function removeTarget(string $target): ResponseMessage
{ {
@ -82,7 +98,7 @@ class WorkerClient extends Client {
* @param string $target * @param string $target
* @param int $concurrency * @param int $concurrency
* @return ResponseMessage * @return ResponseMessage
* @throws Exception * @throws JobdException
*/ */
public function setTargetConcurrency(string $target, int $concurrency): ResponseMessage public function setTargetConcurrency(string $target, int $concurrency): ResponseMessage
{ {

View File

@ -0,0 +1,11 @@
<?php
namespace jobd\exceptions;
class JobInterruptedException extends \Exception {
public function __construct(int $code = 1, string $message = "") {
parent::__construct($message, $code);
}
}

View File

@ -0,0 +1,5 @@
<?php
namespace jobd\exceptions;
class JobdException extends \Exception {}

View File

@ -1,6 +1,6 @@
<?php <?php
namespace jobd; namespace jobd\messages;
abstract class Message { abstract class Message {

View File

@ -1,6 +1,6 @@
<?php <?php
namespace jobd; namespace jobd\messages;
class PingMessage extends Message { class PingMessage extends Message {

View File

@ -1,6 +1,6 @@
<?php <?php
namespace jobd; namespace jobd\messages;
class PongMessage extends Message { class PongMessage extends Message {

View File

@ -1,6 +1,6 @@
<?php <?php
namespace jobd; namespace jobd\messages;
class RequestMessage extends Message { class RequestMessage extends Message {

View File

@ -1,6 +1,6 @@
<?php <?php
namespace jobd; namespace jobd\messages;
class ResponseMessage extends Message { class ResponseMessage extends Message {