This commit is contained in:
Evgeny Zinoviev 2021-05-07 23:39:20 +03:00
commit 9a98ac50ff
21 changed files with 1255 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
.idea
/vendor

21
LICENSE Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (C) 2021 Evgeny Zinoviev
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

81
README.md Normal file
View File

@ -0,0 +1,81 @@
# jobd-php-example
This repository contains example of PHP integration with jobd. The code is
mainly an excerpt from a real existent PHP application, with some changes.
To launch this example and see how it works, you will need to set up a jobd-master
instance along with two jobd worker instances.
It was written and tested on my local machine with PHP 7.3, so I'm sharing all
my configs as is. Don't forget to replace values, such as IP addresses,
usernames, passwords and so on, with yours, and generally adjust it to your needs.
## Configuration
### jobd
jobd configs are included in the repo: [`jobd-1.conf`](jobd-1.conf),
[`jobd-2.conf`](jobd-2.conf), [`jobd-master.conf`](jobd-master.conf).
### MySQL
[`schema.sql`](schema.sql) contains schema of MySQL table used in the example.
### Runtime
For the sake of simplicity, runtime configuration (such as MySQL credentials)
is stored in [`init.php`](src/init.php) as global constants. Adjust to your needs.
## Usage
1. Make sure **MySQL server** is running.
2. Start **jobd-master** and two **jobd** instances:
```
jobd-master --config jobd-master.conf
jobd --config jobd-1.conf
jobd --config jobd-2.conf
```
3. Install dependencies with composer:
```
composer install
```
4. Test configuration:
```
php src/main.php test
```
This command will test MySQL and jobd connection.
You can also print the list of workers by executing:
```
jobctl --master list-workers
```
5. Launch test jobs:
```
php src/main.php hello
```
This will launch two [`Hello`](src/jobs/Hello.php) jobs, wait for results
and print them.
6. Launch another test job. [This one](src/jobs/CreateFile.php) will run in
background. It just creates a file with the name you give it. Not like
it's anything useful, but it's for the demo.
```
php src/main.php createfile
```
Note that if the path your specify is not absolute, it will be relative to
the jobd's working directory, specified `launcher.cwd` config option.
If it fails, just look into the MySQL table, there must be some error.
## License
MIT

17
composer.json Normal file
View File

@ -0,0 +1,17 @@
{
"name": "ch1p/jobd-php-example",
"description": "example usage of jobd and it's PHP client",
"minimum-stability": "stable",
"license": "MIT",
"authors": [
{
"name": "Evgeny Zinoviev",
"email": "me@ch1p.io"
}
],
"require": {
"ch1p/jobd-client": "^1.5",
"ext-json": "*",
"ext-mysqli": "*"
}
}

59
composer.lock generated Normal file
View File

@ -0,0 +1,59 @@
{
"_readme": [
"This file locks the dependencies of your project to a known state",
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically"
],
"content-hash": "96d78551c8c7b7a22cb20daf8025e024",
"packages": [
{
"name": "ch1p/jobd-client",
"version": "1.5.2",
"source": {
"type": "git",
"url": "https://github.com/gch1p/php-jobd-client.git",
"reference": "a3bb10feaea28fd54866dc9c4311a247a84a202c"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/gch1p/php-jobd-client/zipball/a3bb10feaea28fd54866dc9c4311a247a84a202c",
"reference": "a3bb10feaea28fd54866dc9c4311a247a84a202c",
"shasum": ""
},
"require": {
"ext-json": "*",
"ext-sockets": "*",
"php": ">=7.0"
},
"type": "library",
"autoload": {
"psr-4": {
"jobd\\": "src/"
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"BSD-2-Clause"
],
"keywords": [
"job",
"jobd",
"queue"
],
"support": {
"issues": "https://github.com/gch1p/php-jobd-client/issues",
"source": "https://github.com/gch1p/php-jobd-client/tree/v1.5.2"
},
"time": "2021-03-15T22:02:40+00:00"
}
],
"packages-dev": [],
"aliases": [],
"minimum-stability": "stable",
"stability-flags": [],
"prefer-stable": false,
"prefer-lowest": false,
"platform": [],
"platform-dev": [],
"plugin-api-version": "2.0.0"
}

35
jobd-1.conf Normal file
View File

@ -0,0 +1,35 @@
host = 0.0.0.0
port = 7079
; password =
name = worker-1
master_host = 127.0.0.1
master_port = 7081
master_reconnect_timeout = 10
; Don't do this! Here i put it to /tmp only because it was
; for a test. In a real world you should use something more
; appropriate, like /var/log
log_file = /tmp/jobd-1.log
log_level_file = warn
log_level_console = warn
mysql_host = 10.211.55.6
mysql_port = 3306
mysql_user = jobd
mysql_password = password
mysql_database = jobd
mysql_table = jobs2
mysql_fetch_limit = 10
launcher = php /Users/ch1p/dev/jobd-php-example/src/launcher.php {id}
launcher.cwd = /Users/ch1p/dev/jobd-php-example/src
launcher.env.LC_ALL = en_US.UTF-8
launcher.env.LANGUAGE = en_US.UTF-8
launcher.env.LANG = en_US.UTF-8
max_output_buffer = 16777216
[targets]
1/high = 10
1/low = 10
any = 5

35
jobd-2.conf Normal file
View File

@ -0,0 +1,35 @@
host = 0.0.0.0
port = 7080
; password =
name = worker-2
master_host = 127.0.0.1
master_port = 7081
master_reconnect_timeout = 10
; Don't do this! Here i put it to /tmp only because it was
; for a test. In a real world you should use something more
; appropriate, like /var/log
log_file = /tmp/jobd-2.log
log_level_file = warn
log_level_console = warn
mysql_host = 10.211.55.6
mysql_port = 3306
mysql_user = jobd
mysql_password = password
mysql_database = jobd
mysql_table = jobs2
mysql_fetch_limit = 10
launcher = php /Users/ch1p/dev/jobd-php-example/src/launcher.php {id}
launcher.cwd = /Users/ch1p/dev/jobd-php-example/src
launcher.env.LC_ALL = en_US.UTF-8
launcher.env.LANGUAGE = en_US.UTF-8
launcher.env.LANG = en_US.UTF-8
max_output_buffer = 16777216
[targets]
2/high = 10
2/low = 10
any = 5

13
jobd-master.conf Normal file
View File

@ -0,0 +1,13 @@
host = 0.0.0.0
port = 7081
; password =
ping_interval = 30
poke_throttle_interval = 0.5
; Don't do this! Here i put it to /tmp only because it was
; for a test. In a real world you should use something more
; appropriate, like /var/log
log_file = /tmp/jobd-master.log
log_level_file = warn
log_level_console = warn

17
schema.sql Normal file
View File

@ -0,0 +1,17 @@
CREATE TABLE `jobs2` (
`id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT,
`target` char(32) NOT NULL,
`name` char(64) NOT NULL,
`time_created` int(10) UNSIGNED NOT NULL DEFAULT 0,
`time_started` int(10) UNSIGNED NOT NULL DEFAULT 0,
`time_finished` int(10) UNSIGNED NOT NULL DEFAULT 0,
`status` enum('waiting', 'manual', 'accepted', 'running', 'done', 'ignored') NOT NULL DEFAULT 'waiting',
`result` enum('ok', 'fail') DEFAULT NULL,
`return_code` tinyint(3) UNSIGNED DEFAULT NULL,
`sig` char(10) DEFAULT NULL,
`input` mediumtext NOT NULL,
`stdout` mediumtext NOT NULL DEFAULT '',
`stderr` mediumtext NOT NULL DEFAULT '',
PRIMARY KEY (`id`),
KEY `select_for_target_priority_idx` (`target`, `status`, `id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

51
src/classes/Job.php Normal file
View File

@ -0,0 +1,51 @@
<?php
abstract class Job extends model {
// ENUM status
const STATUS_WAITING = 'waiting';
const STATUS_MANUAL = 'manual';
const STATUS_ACCEPTED = 'accepted';
const STATUS_IGNORED = 'ignored';
const STATUS_RUNNING = 'running';
const STATUS_DONE = 'done';
// ENUM result
const RESULT_OK = 'ok';
const RESULT_FAIL = 'fail';
const DB_TABLE = 'jobs';
protected static $Fields = [
'id' => model::INTEGER,
'target' => model::STRING,
'name' => model::STRING,
'time_created' => model::INTEGER,
'time_started' => model::INTEGER,
'time_finished' => model::INTEGER,
'status' => model::STRING, // ENUM
'result' => model::STRING, // ENUM
'return_code' => model::INTEGER,
'sig' => model::STRING,
'stdout' => model::STRING,
'stderr' => model::STRING,
'input' => model::SERIALIZED,
];
public $id;
public $target;
public $name;
public $timeCreated;
public $timeStarted;
public $timeFinished;
public $status;
public $result;
public $returnCode;
public $sig;
public $stdout;
public $stderr;
public $input;
abstract public function run();
}

96
src/classes/JobResult.php Normal file
View File

@ -0,0 +1,96 @@
<?php
class JobResult {
/**
* @var string $result
*/
protected $result;
/**
* @var int $returnCode
*/
protected $returnCode;
/**
* @var string|null $signal
*/
protected $signal;
/**
* @var string $stdout
*/
protected $stdout;
/**
* @var string $stderr
*/
protected $stderr;
/**
* @param string $result
* @param int $return_code
* @param string $stdout
* @param string $stderr
* @param null $signal
* @return $this
*/
public function setResult(string $result,
int $return_code,
string $stdout,
string $stderr,
$signal = null): JobResult
{
$this->result = $result;
$this->returnCode = $return_code;
$this->stdout = $stdout;
$this->stderr = $stderr;
$this->signal = $signal;
return $this;
}
/**
* @param string $error
* @return $this
*/
public function setError(string $error): JobResult
{
$this->result = Job::RESULT_FAIL;
$this->stderr = $error;
return $this;
}
/**
* @return bool
*/
public function isFailed(): bool
{
return $this->result == Job::RESULT_FAIL;
}
/**
* @return string
*/
public function getStdout(): string
{
return $this->stdout;
}
/**
* @return mixed|null
*/
public function getStdoutAsJSON() {
$json = jsonDecode($this->stdout);
return $json ? $json : null;
}
/**
* @return string
*/
public function getError(): string {
return $this->stderr ?? '';
}
}

275
src/classes/jobs.php Normal file
View File

@ -0,0 +1,275 @@
<?php
class jobs
{
/**
* @var jobs_destructor $destructor_instance
*/
private static $destructor_instance;
/**
* @var array<int, array> $new_jobs
*/
private static $new_jobs = [];
/**
* Automatically poke master on exit.
*/
public static function destruct()
{
if (!empty(self::$new_jobs)) {
$targets = [];
foreach (self::$new_jobs as $new_job) {
if ($new_job['status'] === Job::STATUS_WAITING)
$targets[$new_job['target']] = true;
}
if (!empty($targets)) {
$targets = array_keys($targets);
self::poke($targets);
}
}
}
/**
* Create job.
*
* @param int|string $target
* @param string $name
* @param array $data
* @param string $status
* @return int|string Job ID
*/
public static function add($target, string $name, array $data = [], string $status = Job::STATUS_WAITING): int
{
if (is_null(self::$destructor_instance))
self::$destructor_instance = new jobs_destructor();
if (strpos($name, '\\') !== false) {
$pos = strrpos($name, '\\');
$name = substr($name, $pos + 1);
}
$db = getMySQL();
$db->insert(JOBD_TABLE, [
'target' => $target,
'name' => $name,
'time_created' => time(),
'input' => serialize($data),
'status' => $status
]);
$id = $db->insertId();
self::$new_jobs[$id] = [
'target' => $target,
'status' => $status
];
return $id;
}
/**
* Create manual job.
*
* @param int|string $target
* @param string $name
* @param array $data
* @return int
*/
public static function manual($target, string $name, array $data = []): int
{
return self::add($target, $name, $data, Job::STATUS_MANUAL);
}
/**
* Run jobs with given ids and status=Job::STATUS_MANUAL and wait for results.
*
* If only one job was given and it's failed, an Exception will be thrown!
* If multiple jobs were given and some of them failed, an array of JobResults will be returned.
*
* @param int|int[] $job_ids
* @return array<int, JobResult>|JobResult
* @throws Exception
*/
public static function run($job_ids)
{
if (!is_array($job_ids))
$job_ids = [$job_ids];
$job_ids_orig = $job_ids;
$job_ids = array_flip($job_ids);
$jobs = [];
// look for the given jobs in self::$new_jobs
foreach (self::$new_jobs as $id => $new_job) {
if ($new_job['status'] == Job::STATUS_MANUAL && isset($job_ids[$id])) {
$jobs[] = ['id' => $id, 'target' => $new_job['target']];
unset($job_ids[$id]);
}
}
// if some (or all) jobs were not found in self::$new_jobs, get them from the database
if (!empty($job_ids)) {
$job_ids = array_keys($job_ids);
$db = getMySQL();
$q = $db->query("SELECT id, target, status AS target FROM ".JOBD_TABLE." WHERE id IN (".implode(',', $job_ids).")");
$job_ids = array_flip($job_ids);
while ($row = $db->fetch($q)) {
// only manual jobs are allowed
if ($row['status'] != Job::STATUS_MANUAL)
throw new Exception("job id=${row['id']} has status = {$row['status']} != manual");
$jobs[] = [
'id' => (int)$row['id'],
'target' => $row['target']
];
unset($job_ids[$row['id']]);
}
$q->free();
// we were given invalid ids, it seems. throw an exception and don't continue
if (!empty($job_ids))
throw new Exception("jobs with id ".implode(', ', array_keys($job_ids))." not found");
}
// connect to master and send run-manual request
$client = getJobdMaster();
$response = $client->runManual($jobs);
// master request failed
if (($error = $response->getError()) !== null)
throw new Exception("jobd returned error: ".$error);
// at this point, jobd-master request succeeded
// doesn't mean our jobs were successfully accepted and executed by workers,
// but at least we have some results
/**
* @var array<int, JobResult> $results
*/
$results = [];
$data = $response->getData();
$client->close();
// collect results, successes and failures
if (!empty($data['jobs'])) {
foreach ($data['jobs'] as $job_id => $job_result_raw) {
$job_result = (new JobResult())->setResult(
$job_result_raw['result'],
$job_result_raw['code'],
$job_result_raw['stdout'],
$job_result_raw['stderr'],
$job_result_raw['signal']
);
$results[$job_id] = $job_result;
}
}
if (!empty($data['errors'])) {
foreach ($data['errors'] as $job_id => $job_result_raw) {
$job_result = (new JobResult())->setError($job_result_raw);
$results[$job_id] = $job_result;
}
}
// remove jobs from self::$new_jobs
foreach ($job_ids_orig as $id) {
if (isset(self::$new_jobs[$id]))
unset(self::$new_jobs[$id]);
}
// if the $job_ids arguments wasn't an array, return the JobResult instance
if (count($job_ids_orig) === 1 && count($results) === 1) {
$result = reset($results);
if ($result->isFailed())
throw new Exception($result->getError());
return $result;
}
// otherwise, return array of JobResult instances
return $results;
}
/**
* @param string|string[] $targets
*/
public static function poke($targets)
{
$client = getJobdMaster();
if (!is_array($targets))
$targets = [$targets];
$client->poke($targets);
$targets = array_flip(array_unique($targets));
// remove poked targets from self::$new_jobs to avoid meaninglessly duplicating this poke from the destructor
if (!empty(self::$new_jobs)) {
foreach (self::$new_jobs as $new_job_id => $new_job) {
if ($new_job['status'] == Job::STATUS_WAITING && isset($targets[$new_job['target']]))
unset(self::$new_jobs[$new_job_id]);
}
}
$client->close();
return true;
}
/**
* @param int $id
* @return array
*/
public static function get(int $id)
{
$db = getMySQL();
$q = $db->query("SELECT * FROM ".JOBD_TABLE." WHERE id=?", $id);
return $db->fetch($q);
}
/**
* Delete old succeeded jobs.
*/
public static function cleanup()
{
$db = getMySQL();
$db->query("DELETE FROM ".JOBD_TABLE." WHERE status='done' AND result='ok' AND time_finished < ?",
time() - 86400);
}
}
class job_target
{
const any = "any";
public static function high(int $server): string
{
return "$server/high";
}
public static function low(int $server): string
{
return "$server/low";
}
}
class jobs_destructor
{
public function __destruct()
{
jobs::destruct();
}
}

149
src/classes/model.php Normal file
View File

@ -0,0 +1,149 @@
<?php
abstract class model {
const DB_TABLE = null;
const DB_KEY = 'id';
const STRING = 0;
const INTEGER = 1;
const FLOAT = 2;
const ARRAY = 3;
const BOOLEAN = 4;
const JSON = 5;
const SERIALIZED = 6;
protected static $Fields = [];
public static function create_instance(...$args) {
$cl = get_called_class();
return new $cl(...$args);
}
public function __construct($raw) {
foreach (static::$Fields as $name => $type)
$this->{toCamelCase($name)} = self::cast_to_type($type, $raw[$name]);
if (is_null(static::DB_TABLE))
trigger_error('class '.get_class($this).' doesn\'t have DB_TABLE defined');
}
/**
* @param $fields
*/
public function edit($fields) {
$db = getMySQL();
$save = [];
foreach ($fields as $name => $value) {
switch (static::$Fields[$name]) {
case self::ARRAY:
if (is_array($value)) {
$fields[$name] = implode(',', $value);
$save[$name] = $value;
}
break;
case self::INTEGER:
$value = (int)$value;
$fields[$name] = $value;
$save[$name] = $value;
break;
case self::FLOAT:
$value = (float)$value;
$fields[$name] = $value;
$save[$name] = $value;
break;
case self::BOOLEAN:
$fields[$name] = $value ? 1 : 0;
$save[$name] = $value;
break;
case self::JSON:
$fields[$name] = jsonEncode($value);
$save[$name] = $value;
break;
case self::SERIALIZED:
$fields[$name] = serialize($value);
$save[$name] = $value;
break;
default:
$value = (string)$value;
$fields[$name] = $value;
$save[$name] = $value;
break;
}
}
if (!$db->update(static::DB_TABLE, $fields, static::DB_KEY."=?", $this->get_id())) {
//debugError(__METHOD__.': failed to update database');
return;
}
foreach ($save as $name => $value)
$this->{toCamelCase($name)} = $value;
}
/**
* @return int
*/
public function get_id() {
return $this->{toCamelCase(static::DB_KEY)};
}
/**
* @param array $fields
* @param array $custom_getters
* @return array
*/
public function as_array(array $fields = [], array $custom_getters = []) {
if (empty($fields))
$fields = array_keys(static::$Fields);
$array = [];
foreach ($fields as $field) {
if (isset($custom_getters[$field]) && is_callable($custom_getters[$field])) {
$array[$field] = $custom_getters[$field]();
} else {
$array[$field] = $this->{toCamelCase($field)};
}
}
return $array;
}
/**
* @param $type
* @param $value
* @return array|bool|false|float|int|string
*/
protected static function cast_to_type($type, $value) {
switch ($type) {
case self::BOOLEAN:
return (bool)$value;
case self::INTEGER:
return (int)$value;
case self::FLOAT:
return (float)$value;
case self::ARRAY:
return array_filter(explode(',', $value));
case self::JSON:
return jsonDecode($value);
case self::SERIALIZED:
return unserialize($value);
default:
return (string)$value;
}
}
}

166
src/classes/mysql.php Normal file
View File

@ -0,0 +1,166 @@
<?php
class mysql
{
/** @var mysqli $link */
private $link = null;
public function __construct(string $host, string $user, string $password, string $name)
{
$this->link = new mysqli();
if (!$this->link->real_connect($host, $user, $password, $name)) {
$this->link = null;
throw new Exception('Could not connect to MySQL');
}
}
public function __destruct()
{
if ($this->link)
$this->link->close();
}
public function __get($k)
{
if ($k == 'error') {
return $this->link->error;
}
return $this->$k;
}
public function query(string $sql)
{
if (func_num_args() > 1) {
$mark_count = substr_count($sql, '?');
$positions = array();
$last_pos = -1;
for ($i = 0; $i < $mark_count; $i++) {
$last_pos = strpos($sql, '?', $last_pos + 1);
$positions[] = $last_pos;
}
for ($i = $mark_count - 1; $i >= 0; $i--) {
$arg_val = func_get_arg($i + 1);
if (is_null($arg_val)) {
$v = 'NULL';
} else {
$v = '\'' . $this->escape($arg_val) . '\'';
}
$sql = substr_replace($sql, $v, $positions[$i], 1);
}
}
$q = $this->link->query($sql);
if (!$q) {
$error = $this->link->error;
trigger_error($error, E_USER_WARNING);
return false;
}
return $q;
}
public function insert(string $table, array $fields)
{
return $this->performInsert('INSERT', $table, $fields);
}
public function replace(string $table, array $fields)
{
return $this->performInsert('REPLACE', $table, $fields);
}
protected function performInsert(string $command, string $table, array $fields)
{
$names = [];
$values = [];
$count = 0;
foreach ($fields as $k => $v) {
$names[] = $k;
$values[] = $v;
$count++;
}
$sql = "{$command} INTO `{$table}` (`" . implode('`, `', $names) . "`) VALUES (" . implode(', ', array_fill(0, $count, '?')) . ")";
array_unshift($values, $sql);
return call_user_func_array([$this, 'query'], $values);
}
public function multipleInsert(string $table, array $rows)
{
return $this->performMultipleInsert('INSERT', $table, $rows);
}
public function multipleReplace(string $table, array $rows)
{
return $this->performMultipleInsert('REPLACE', $table, $rows);
}
protected function performMultipleInsert(string $command, string $table, array $rows)
{
$names = [];
$sql_rows = [];
foreach ($rows as $i => $fields) {
$row_values = [];
foreach ($fields as $field_name => $field_val) {
if ($i == 0) {
$names[] = $field_name;
}
$row_values[] = $this->escape($field_val);
}
$sql_rows[] = "('" . implode("', '", $row_values) . "')";
}
$sql = "{$command} INTO `{$table}` (`" . implode('`, `', $names) . "`) VALUES " . implode(', ', $sql_rows);
return $this->query($sql);
}
public function update(string $table, arrow $rows, string ...$cond)
{
$fields = [];
$args = [];
foreach ($rows as $row_name => $row_value) {
$fields[] = "`{$row_name}`=?";
$args[] = $row_value;
}
$sql = "UPDATE `$table` SET " . implode(', ', $fields);
if (!empty($cond)) {
$sql .= " WHERE " . $cond[0];
if (count($cond) > 1)
$args = array_merge($args, array_slice($cond, 1));
}
return $this->query($sql, ...$args);
}
public function fetch(mysqli_result $q)
{
$row = $q->fetch_assoc();
if (!$row) {
$q->free();
return false;
}
return $row;
}
public function result(mysqli_result $q, int $field = 0)
{
return $q ? $q->fetch_row()[$field] : false;
}
public function insertId()
{
return $this->link->insert_id;
}
public function numRows(mysqli_result $query): int
{
return $query->num_rows;
}
public function escape(string $s): string
{
return $this->link->real_escape_string($s);
}
}

View File

@ -0,0 +1,5 @@
<?php
require __DIR__.'/../init.php';
jobs::cleanup();

47
src/functions.php Normal file
View File

@ -0,0 +1,47 @@
<?php
function jsonEncode($obj) {
return json_encode($obj, JSON_UNESCAPED_UNICODE);
}
function jsonDecode($json) {
return json_decode($json, true);
}
function toCamelCase(string $input, string $separator = '_'): string {
return lcfirst(str_replace($separator, '', ucwords($input, $separator)));
}
/* Connection helpers */
function getMySQL(): mysql {
static $link = null;
if (is_null($link))
$link = new mysql(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DB);
return $link;
}
function getJobdMaster(): jobd\MasterClient {
return new jobd\MasterClient(JOBD_PORT, JOBD_HOST, JOBD_PASSWORD);
}
/* Command line helpers */
function green(string $s): string {
return "\033[32m$s\033[0m";
}
function yellow(string $s): string {
return "\033[33m$s\033[0m";
}
function red(string $s): string {
return "\033[31m$s\033[0m";
}
function input(string $prompt): string {
echo $prompt;
return substr(fgets(STDIN), 0, -1);
}

32
src/init.php Normal file
View File

@ -0,0 +1,32 @@
<?php
require __DIR__.'/../vendor/autoload.php';
error_reporting(E_ALL);
ini_set('display_errors', 1);
define('MYSQL_HOST', '10.211.55.6');
define('MYSQL_USER', 'jobd');
define('MYSQL_PASSWORD', 'password');
define('MYSQL_DB', 'jobd');
define('JOBD_TABLE', 'jobs2');
define('JOBD_HOST', '127.0.0.1');
define('JOBD_PORT', jobd\Client::MASTER_PORT);
define('JOBD_PASSWORD', '');
spl_autoload_register(function($class) {
if (strpos($class, '\\') !== false) {
$class = str_replace('\\', '/', $class);
$root = __DIR__;
} else {
$root = __DIR__.'/classes';
}
$path = $root.'/'.$class.'.php';
if (is_file($path))
require_once $path;
});
include __DIR__.'/functions.php';

15
src/jobs/CreateFile.php Normal file
View File

@ -0,0 +1,15 @@
<?php
namespace jobs;
class CreateFile extends \Job
{
public function run()
{
$file = $this->input['file'];
if (!touch($file))
throw new \Exception("failed to touch file '".$file."'");
}
}

15
src/jobs/Hello.php Normal file
View File

@ -0,0 +1,15 @@
<?php
namespace jobs;
class Hello extends \Job
{
public function run()
{
$greetings = "Hello, ".($this->input['name'] ?? 'noname').".\n";
$greetings .= "I'm writing you from ".__METHOD__.", my PID is ".getmypid()." and I'm executing job #".$this->id.".";
echo jsonEncode(['response' => $greetings]);
}
}

31
src/launcher.php Normal file
View File

@ -0,0 +1,31 @@
<?php
require_once __DIR__.'/init.php';
set_time_limit(0);
$job = null;
register_shutdown_function(function() {
global $job;
if ($job !== true)
exit(1);
});
$job_id = $argv[1] ?? null;
$job_raw = jobs::get($job_id);
if (!$job_raw)
throw new InvalidArgumentException("job $job_id not found");
$class_name = "jobs\\{$job_raw['name']}";
$job = new $class_name($job_raw);
if ($job->status != Job::STATUS_RUNNING)
throw new RuntimeException("job status is {$job->status}");
try {
if ($job->run() !== false)
$job = true;
} catch (Exception $e) {
fprintf(STDERR, $e.'');
exit(1);
}

93
src/main.php Normal file
View File

@ -0,0 +1,93 @@
<?php
require __DIR__.'/init.php';
if ($argc < 2) {
echo <<<EOF
Usage: {$argv[0]} COMMAND
Commands:
test
hello
createfile
EOF;
exit;
}
$cmd = $argv[1];
$func = "cmd_{$cmd}";
if (!function_exists($func)) {
echo red("command '".$cmd."' is not implement")."\n";
exit(1);
}
call_user_func($func);
/** Commands */
function cmd_test() {
// MySQL
try {
$db = getMySQL();
$jobs_count = $db->result($db->query("SELECT COUNT(*) FROM ".JOBD_TABLE));
} catch (Exception $e) {
echo red("MySQL connection failed")."\n";
exit(1);
}
echo green("MySQL OK")."\n";
// jobd
try {
$jobd = getJobdMaster();
$status = $jobd->status(true);
$workers_count = count($status->getData()['workers']);
if ($workers_count == 2) {
echo green("jobd-master and jobd OK");
} else {
$message = "jobd-master OK, but ";
$message .= $workers_count == 1 ? "only 1 worker is connected" : "no workers are connected";
echo yellow($message);
}
echo "\n";
} catch (Exception $e) {
echo red("jobd-master connection failed: ".$e->getMessage())."\n";
exit(1);
}
}
function cmd_hello() {
$myname = input('Enter your name: ');
try {
$job_ids = [];
$job_server_map = [];
for ($server = 1; $server <= 2; $server++) {
$id = jobs::manual(job_target::high($server), jobs\Hello::class, ['name' => $myname]);
$job_server_map[$id] = $server;
$job_ids[] = $id;
}
$results = jobs::run($job_ids);
foreach ($results as $job_id => $job_result) {
$server = $job_server_map[$job_id];
echo "> server {$server}:\n";
if ($job_result->isFailed()) {
echo red("failed")."\n";
} else {
echo green($job_result->getStdoutAsJSON()['response'])."\n";
}
echo "\n";
}
} catch (Exception $e) {
echo red("error: ".$e->getMessage())."\n";
exit(1);
}
}
function cmd_createfile() {
$file = input('Enter file name: ');
jobs::add(job_target::any, jobs\CreateFile::class, ['file' => $file]);
}