diff --git a/.gitignore b/.gitignore index 485dee6..45b1244 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ .idea +/vendor \ No newline at end of file diff --git a/README.md b/README.md index 97d8d02..9d6993b 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ Here's a small example. ```php try { $jobd = new jobd\MasterClient(); -} catch (jobd\Exception $e) { +} catch (\jobd\exceptions\JobdException $e) { die("Failed to connect.\n"); } @@ -29,7 +29,7 @@ try { // get status from master $status = $jobd->status()->getData(); -} catch (jobd\Exception $e) { +} catch (\jobd\exceptions\JobdException $e) { die('jobd error: '.$e->getMessage()."\n"); } diff --git a/composer.json b/composer.json index f2dde7c..0fbb9f7 100644 --- a/composer.json +++ b/composer.json @@ -1,6 +1,6 @@ { "name": "ch1p/jobd-client", - "version": "1.5.2", + "version": "1.6.0", "license": "BSD-2-Clause", "keywords": ["queue", "job", "jobd"], "repositories": [ @@ -11,7 +11,9 @@ ], "autoload": { "psr-4": { - "jobd\\": "src/" + "jobd\\": "src/", + "jobd\\messages\\": "src/messages", + "jobd\\exceptions\\": "src/exceptions" } }, "require": { diff --git a/composer.lock b/composer.lock new file mode 100644 index 0000000..768bcea --- /dev/null +++ b/composer.lock @@ -0,0 +1,22 @@ +{ + "_readme": [ + "This file locks the dependencies of your project to a known state", + "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", + "This file is @generated automatically" + ], + "content-hash": "3ea2178ffa7d2dde56d1c2ac872d7c34", + "packages": [], + "packages-dev": [], + "aliases": [], + "minimum-stability": "stable", + "stability-flags": [], + "prefer-stable": false, + "prefer-lowest": false, + "platform": { + "php": ">=7.0", + "ext-json": "*", + "ext-sockets": "*" + }, + "platform-dev": [], + "plugin-api-version": "2.3.0" +} diff --git a/src/Client.php b/src/Client.php index b4628ca..b66b6e7 100644 --- a/src/Client.php +++ b/src/Client.php @@ -2,8 +2,14 @@ namespace jobd; +use jobd\exceptions\JobdException; +use jobd\messages\Message; +use jobd\messages\PingMessage; +use jobd\messages\PongMessage; +use jobd\messages\RequestMessage; +use jobd\messages\ResponseMessage; -class Client { +abstract class Client { const WORKER_PORT = 7080; const MASTER_PORT = 7081; @@ -23,7 +29,7 @@ class Client { * @param int $port * @param string $host * @param string $password - * @throws Exception + * @throws JobdException */ public function __construct(int $port, string $host = '127.0.0.1', string $password = '') { @@ -32,12 +38,12 @@ class Client { $this->password = $password; if (($socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP)) === false) - throw new Exception("socket_create() failed: ".$this->getSocketError()); + throw new JobdException("socket_create() failed: ".$this->getSocketError()); $this->sock = $socket; if ((socket_connect($socket, $host, $port)) === false) - throw new Exception("socket_connect() failed: ".$this->getSocketError()); + throw new JobdException("socket_connect() failed: ".$this->getSocketError()); $this->lastOutgoingRequestNo = mt_rand(1 /* 0 is reserved */, self::REQUEST_NO_LIMIT); } @@ -53,7 +59,7 @@ class Client { /** * @param string[] $targets * @return ResponseMessage - * @throws Exception + * @throws JobdException */ public function pause(array $targets = []): ResponseMessage { @@ -69,7 +75,7 @@ class Client { /** * @param string[] $targets * @return ResponseMessage - * @throws Exception + * @throws JobdException */ public function continue(array $targets = []): ResponseMessage { @@ -84,7 +90,7 @@ class Client { /** * @return PongMessage - * @throws Exception + * @throws JobdException */ public function ping(): PongMessage { @@ -113,7 +119,7 @@ class Client { /** * @param Message $message - * @throws Exception + * @throws JobdException */ public function send(Message $message) { @@ -123,7 +129,7 @@ class Client { while ($remained > 0) { $result = socket_write($this->sock, $data); if ($result === false) - throw new Exception(__METHOD__ . ": socket_write() failed: ".$this->getSocketError()); + throw new JobdException(__METHOD__ . ": socket_write() failed: ".$this->getSocketError()); $remained -= $result; if ($remained > 0) @@ -134,7 +140,7 @@ class Client { /** * @param int $request_no * @return RequestMessage|ResponseMessage|PingMessage|PongMessage - * @throws Exception + * @throws JobdException */ public function recv(int $request_no = -1) { @@ -145,7 +151,7 @@ class Client { while (true) { $result = socket_recv($this->sock, $recv_buf, 1024, 0); if ($result === false) - throw new Exception(__METHOD__ . ": socket_recv() failed: " . $this->getSocketError()); + throw new JobdException(__METHOD__ . ": socket_recv() failed: " . $this->getSocketError()); // peer disconnected if ($result === 0) @@ -172,7 +178,7 @@ class Client { } while ($eot_pos !== false && $offset < $buflen-1); if (empty($messages)) - throw new Exception("Malformed response: no messages found. Response: {$buf}"); + throw new JobdException("Malformed response: no messages found. Response: {$buf}"); if (count($messages) > 1) trigger_error(__METHOD__.": received more than one message"); @@ -196,7 +202,7 @@ class Client { ); if (empty($messages)) - throw new Exception("Malformed response: response for {$request_no} not found."); + throw new JobdException("Malformed response: response for {$request_no} not found."); if (count($messages) == 2) { @@ -220,7 +226,7 @@ class Client { if ($response instanceof ResponseMessage) { if ($error = $response->getError()) - throw new Exception($response->getError()); + throw new JobdException($response->getError()); } return $response; @@ -242,13 +248,13 @@ class Client { /** * @param string $raw_string * @return RequestMessage|ResponseMessage|PingMessage|PongMessage - * @throws Exception + * @throws JobdException */ protected static function parseMessage(string $raw_string) { $raw = json_decode($raw_string, true); if (!is_array($raw) || count($raw) < 1) - throw new Exception("Malformed message: {$raw_string}"); + throw new JobdException("Malformed message: {$raw_string}"); list($type) = $raw; @@ -263,8 +269,8 @@ class Client { ['password', 's', false], ['data', 'a', false] ]); - } catch (Exception $e) { - throw new Exception("Malformed REQUEST message: {$e->getMessage()}"); + } catch (JobdException $e) { + throw new JobdException("Malformed REQUEST message: {$e->getMessage()}"); } $message = new RequestMessage($data['type'], $data['data'] ?? null); @@ -283,8 +289,8 @@ class Client { ['data', 'aifs', false], ['error', 's', false], ]); - } catch (Exception $e) { - throw new Exception("Malformed RESPONSE message: {$e->getMessage()}"); + } catch (JobdException $e) { + throw new JobdException("Malformed RESPONSE message: {$e->getMessage()}"); } return new ResponseMessage($data['no'], $data['error'] ?? null, $data['data'] ?? null); @@ -296,25 +302,25 @@ class Client { return new PongMessage(); default: - throw new Exception("Malformed message: unexpected type {$type}"); + throw new JobdException("Malformed message: unexpected type {$type}"); } } /** * @param mixed $data * @param array $schema - * @throws Exception + * @throws JobdException */ protected static function validateData($data, array $schema) { if (!$data || !is_array($data)) - throw new Exception('data must be array'); + throw new JobdException('data must be array'); foreach ($schema as $schema_item) { list ($key_name, $key_types, $key_required) = $schema_item; if (!isset($data[$key_name])) { if ($key_required) - throw new Exception("'{$key_name}' is missing"); + throw new JobdException("'{$key_name}' is missing"); continue; } @@ -354,7 +360,7 @@ class Client { } if (!$passed) - throw new Exception("{$key_name}: required type is '{$key_types}'"); + throw new JobdException("{$key_name}: required type is '{$key_types}'"); } } diff --git a/src/Exception.php b/src/Exception.php deleted file mode 100644 index be64d4c..0000000 --- a/src/Exception.php +++ /dev/null @@ -1,5 +0,0 @@ -sendSignals([ + [ + 'id' => $job_id, + 'signal' => $signal, + 'target' => $target + ] + ]); + } + + /** + * @param array $data + * @return ResponseMessage + * @throws JobdException + */ + public function sendSignals(array $data): ResponseMessage { + return $this->recv( + $this->sendRequest(new RequestMessage('send-signal', ['jobs' => $data])) + ); + } + } \ No newline at end of file diff --git a/src/WorkerClient.php b/src/WorkerClient.php index 2773de5..16f8963 100644 --- a/src/WorkerClient.php +++ b/src/WorkerClient.php @@ -2,6 +2,10 @@ namespace jobd; +use jobd\exceptions\JobdException; +use jobd\messages\RequestMessage; +use jobd\messages\ResponseMessage; + class WorkerClient extends Client { public function __construct(int $port = Client::WORKER_PORT, ...$args) @@ -48,11 +52,23 @@ class WorkerClient extends Client { ); } + /** + * @param array[] $jobs + * @return ResponseMessage + * @throws JobdException + */ + public function sendSignal(array $jobs): ResponseMessage + { + return $this->recv( + $this->sendRequest(new RequestMessage('send-signal', ['jobs' => $jobs])) + ); + } + /** * @param string $target * @param int $concurrency * @return ResponseMessage - * @throws Exception + * @throws JobdException */ public function addTarget(string $target, int $concurrency): ResponseMessage { @@ -67,7 +83,7 @@ class WorkerClient extends Client { /** * @param string $target * @return ResponseMessage - * @throws Exception + * @throws JobdException */ public function removeTarget(string $target): ResponseMessage { @@ -82,7 +98,7 @@ class WorkerClient extends Client { * @param string $target * @param int $concurrency * @return ResponseMessage - * @throws Exception + * @throws JobdException */ public function setTargetConcurrency(string $target, int $concurrency): ResponseMessage { diff --git a/src/exceptions/JobInterruptedException.php b/src/exceptions/JobInterruptedException.php new file mode 100644 index 0000000..c645cc5 --- /dev/null +++ b/src/exceptions/JobInterruptedException.php @@ -0,0 +1,11 @@ +