signals support

This commit is contained in:
Evgeny Zinoviev 2023-04-03 13:54:30 +03:00
parent 0775fb2439
commit a6bdd77f06
8 changed files with 225 additions and 114 deletions

View File

@ -609,6 +609,12 @@ Here is the list of supported requests, using `type(arguments)` notation.
An object whose keys represent failed job IDs and whose values are error
messages.
* ##### `send-signal(jobs: object<id: int, signal: int>)`
Send signals to jobs which are still executing and return results.
Response [data](#data-array--object--string--int) type: **object** with job IDs as keys and
kill status (boolean where true means that signal is successfully delivered) as values.
#### jobd-master requests
* ##### `register-worker(targets: string[], name: string)`
@ -636,6 +642,10 @@ Here is the list of supported requests, using `type(arguments)` notation.
Send [`run-manual()`](#run-manualids-int) requests to registered jobd instances
serving specified targets, aggregate and return results.
* ##### `send-signal(jobs: {id: int, signal: int, target: string}[])`
Send [`send-signal()`](#send-signal-jobs) requests to registered jobd instances
serving specified targets, aggregate and return results.
### Response Message
`DATA` is a JSON object with following keys:
@ -665,7 +675,8 @@ Example (w/o trailing `EOT`):
## TODO
- graceful shutdown
- graceful shutdown of jobd
- support signals in jobctl
## License

View File

@ -14,7 +14,7 @@ log_level_file = info
log_level_console = debug
; mysql settings
mysql_host = 10.211.55.6
mysql_host = 127.0.0.1
mysql_port = 3306
mysql_user = jobd
mysql_password = password

162
package-lock.json generated
View File

@ -1,11 +1,12 @@
{
"name": "jobd",
"version": "1.11.0",
"version": "1.12.0",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"version": "1.11.0",
"name": "jobd",
"version": "1.12.0",
"license": "BSD-2-Clause",
"os": [
"darwin",
@ -15,8 +16,8 @@
"columnify": "^1.5.4",
"ini": "^2.0.0",
"lodash": "^4.17.21",
"log4js": "^6.3.0",
"minimist": "^1.2.5",
"log4js": "^6.4.0",
"minimist": "^1.2.8",
"mysql": "^2.18.1",
"promise-mysql": "^5.0.2",
"queue": "^6.0.2"
@ -92,22 +93,27 @@
"integrity": "sha1-tf1UIgqivFq1eqtxQMlAdUUDwac="
},
"node_modules/date-format": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/date-format/-/date-format-3.0.0.tgz",
"integrity": "sha512-eyTcpKOcamdhWJXj56DpQMo1ylSQpcGtGKXcU0Tb97+K56/CF5amAqqqNj0+KvA0iw2ynxtHWFsPDSClCxe48w==",
"version": "4.0.14",
"resolved": "https://registry.npmjs.org/date-format/-/date-format-4.0.14.tgz",
"integrity": "sha512-39BOQLs9ZjKh0/patS9nrT8wc3ioX3/eA/zgbKNopnF2wCqJEoxywwwElATYvRsXdnOxA/OQeQoFZ3rFjVajhg==",
"engines": {
"node": ">=4.0"
}
},
"node_modules/debug": {
"version": "4.3.1",
"resolved": "https://registry.npmjs.org/debug/-/debug-4.3.1.tgz",
"integrity": "sha512-doEwdvm4PCeK4K3RQN2ZC2BYUBaxwLARCqZmMjtF8a51J2Rb0xpVloFRnCODwqjpwnAoao4pelN8l3RJdv3gRQ==",
"version": "4.3.4",
"resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz",
"integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==",
"dependencies": {
"ms": "2.1.2"
},
"engines": {
"node": ">=6.0"
},
"peerDependenciesMeta": {
"supports-color": {
"optional": true
}
}
},
"node_modules/defaults": {
@ -119,9 +125,9 @@
}
},
"node_modules/flatted": {
"version": "2.0.2",
"resolved": "https://registry.npmjs.org/flatted/-/flatted-2.0.2.tgz",
"integrity": "sha512-r5wGx7YeOwNWNlCA0wQ86zKyDLMQr+/RB8xy74M4hTphfmjlijTSSXGuH8rnvKZnfT9i+75zmd8jcKdMR4O6jA=="
"version": "3.2.7",
"resolved": "https://registry.npmjs.org/flatted/-/flatted-3.2.7.tgz",
"integrity": "sha512-5nqDSxl8nn5BSNxyR3n4I6eDmbolI6WT+QqR547RwxQapgjQBmtktdP+HTBb/a/zLsbzERTONyUB5pefh5TtjQ=="
},
"node_modules/fs-extra": {
"version": "8.1.0",
@ -137,9 +143,9 @@
}
},
"node_modules/graceful-fs": {
"version": "4.2.6",
"resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.6.tgz",
"integrity": "sha512-nTnJ528pbqxYanhpDYsi4Rd8MAeaBA67+RZ10CM1m3bTAVFEDcd5AuA4a6W5YkGZ1iNXHzZz8T6TBKLeBuNriQ=="
"version": "4.2.11",
"resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.11.tgz",
"integrity": "sha512-RbJ5/jmFcNNCcDV5o9eTnBLJ/HszWV0P73bc+Ff4nS/rJj+YaS6IGyiOL0VoBYX+l1Wrl3k63h/KrH+nhJ0XvQ=="
},
"node_modules/inherits": {
"version": "2.0.4",
@ -162,8 +168,8 @@
"node_modules/jsonfile": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/jsonfile/-/jsonfile-4.0.0.tgz",
"integrity": "sha1-h3Gq4HmbZAdrdmQPygWPnBDjPss=",
"dependencies": {
"integrity": "sha512-m6F1R3z8jjlf2imQHS2Qez5sjKWQzbuuhuJ/FKYFRZvPE3PuHcSMVZzfsLhGVOkfd20obL5SWEBew5ShlquNxg==",
"optionalDependencies": {
"graceful-fs": "^4.1.6"
}
},
@ -173,24 +179,27 @@
"integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg=="
},
"node_modules/log4js": {
"version": "6.3.0",
"resolved": "https://registry.npmjs.org/log4js/-/log4js-6.3.0.tgz",
"integrity": "sha512-Mc8jNuSFImQUIateBFwdOQcmC6Q5maU0VVvdC2R6XMb66/VnT+7WS4D/0EeNMZu1YODmJe5NIn2XftCzEocUgw==",
"version": "6.9.1",
"resolved": "https://registry.npmjs.org/log4js/-/log4js-6.9.1.tgz",
"integrity": "sha512-1somDdy9sChrr9/f4UlzhdaGfDR2c/SaD2a4T7qEkG4jTS57/B3qmnjLYePwQ8cqWnUHZI0iAKxMBpCZICiZ2g==",
"dependencies": {
"date-format": "^3.0.0",
"debug": "^4.1.1",
"flatted": "^2.0.1",
"rfdc": "^1.1.4",
"streamroller": "^2.2.4"
"date-format": "^4.0.14",
"debug": "^4.3.4",
"flatted": "^3.2.7",
"rfdc": "^1.3.0",
"streamroller": "^3.1.5"
},
"engines": {
"node": ">=8.0"
}
},
"node_modules/minimist": {
"version": "1.2.5",
"resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.5.tgz",
"integrity": "sha512-FM9nNUYrRBAELZQT3xeZQ7fmMOBg6nWNmJKTcgsJeaLstP/UODVpGsr5OhXhhXg6f+qtJ8uiZ+PUxkDWcgIXLw=="
"version": "1.2.8",
"resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.8.tgz",
"integrity": "sha512-2yyAR8qBkN3YuheJanUpWC5U3bb5osDywNB8RzDVlDwDHbocAJveqqj1u8+SVD7jkWT4yvsHCpWqqWqAxb0zCA==",
"funding": {
"url": "https://github.com/sponsors/ljharb"
}
},
"node_modules/ms": {
"version": "2.1.2",
@ -250,9 +259,9 @@
}
},
"node_modules/rfdc": {
"version": "1.2.0",
"resolved": "https://registry.npmjs.org/rfdc/-/rfdc-1.2.0.tgz",
"integrity": "sha512-ijLyszTMmUrXvjSooucVQwimGUk84eRcmCuLV8Xghe3UO85mjUtRAHRyoMM6XtyqbECaXuBWx18La3523sXINA=="
"version": "1.3.0",
"resolved": "https://registry.npmjs.org/rfdc/-/rfdc-1.3.0.tgz",
"integrity": "sha512-V2hovdzFbOi77/WajaSMXk2OLm+xNIeQdMMuB7icj7bk6zi2F8GGAxigcnDFpJHbNyNcgyJDiP+8nOrY5cZGrA=="
},
"node_modules/safe-buffer": {
"version": "5.1.2",
@ -268,26 +277,18 @@
}
},
"node_modules/streamroller": {
"version": "2.2.4",
"resolved": "https://registry.npmjs.org/streamroller/-/streamroller-2.2.4.tgz",
"integrity": "sha512-OG79qm3AujAM9ImoqgWEY1xG4HX+Lw+yY6qZj9R1K2mhF5bEmQ849wvrb+4vt4jLMLzwXttJlQbOdPOQVRv7DQ==",
"version": "3.1.5",
"resolved": "https://registry.npmjs.org/streamroller/-/streamroller-3.1.5.tgz",
"integrity": "sha512-KFxaM7XT+irxvdqSP1LGLgNWbYN7ay5owZ3r/8t77p+EtSUAfUgtl7be3xtqtOmGUl9K9YPO2ca8133RlTjvKw==",
"dependencies": {
"date-format": "^2.1.0",
"debug": "^4.1.1",
"date-format": "^4.0.14",
"debug": "^4.3.4",
"fs-extra": "^8.1.0"
},
"engines": {
"node": ">=8.0"
}
},
"node_modules/streamroller/node_modules/date-format": {
"version": "2.1.0",
"resolved": "https://registry.npmjs.org/date-format/-/date-format-2.1.0.tgz",
"integrity": "sha512-bYQuGLeFxhkxNOF3rcMtiZxvCBAquGzZm6oWA1oZ0g2THUzivaRhv8uOhdr19LmoobSOLoIAxeUK2RdbM8IFTA==",
"engines": {
"node": ">=4.0"
}
},
"node_modules/string_decoder": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.1.1.tgz",
@ -383,14 +384,14 @@
"integrity": "sha1-tf1UIgqivFq1eqtxQMlAdUUDwac="
},
"date-format": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/date-format/-/date-format-3.0.0.tgz",
"integrity": "sha512-eyTcpKOcamdhWJXj56DpQMo1ylSQpcGtGKXcU0Tb97+K56/CF5amAqqqNj0+KvA0iw2ynxtHWFsPDSClCxe48w=="
"version": "4.0.14",
"resolved": "https://registry.npmjs.org/date-format/-/date-format-4.0.14.tgz",
"integrity": "sha512-39BOQLs9ZjKh0/patS9nrT8wc3ioX3/eA/zgbKNopnF2wCqJEoxywwwElATYvRsXdnOxA/OQeQoFZ3rFjVajhg=="
},
"debug": {
"version": "4.3.1",
"resolved": "https://registry.npmjs.org/debug/-/debug-4.3.1.tgz",
"integrity": "sha512-doEwdvm4PCeK4K3RQN2ZC2BYUBaxwLARCqZmMjtF8a51J2Rb0xpVloFRnCODwqjpwnAoao4pelN8l3RJdv3gRQ==",
"version": "4.3.4",
"resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz",
"integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==",
"requires": {
"ms": "2.1.2"
}
@ -404,9 +405,9 @@
}
},
"flatted": {
"version": "2.0.2",
"resolved": "https://registry.npmjs.org/flatted/-/flatted-2.0.2.tgz",
"integrity": "sha512-r5wGx7YeOwNWNlCA0wQ86zKyDLMQr+/RB8xy74M4hTphfmjlijTSSXGuH8rnvKZnfT9i+75zmd8jcKdMR4O6jA=="
"version": "3.2.7",
"resolved": "https://registry.npmjs.org/flatted/-/flatted-3.2.7.tgz",
"integrity": "sha512-5nqDSxl8nn5BSNxyR3n4I6eDmbolI6WT+QqR547RwxQapgjQBmtktdP+HTBb/a/zLsbzERTONyUB5pefh5TtjQ=="
},
"fs-extra": {
"version": "8.1.0",
@ -419,9 +420,9 @@
}
},
"graceful-fs": {
"version": "4.2.6",
"resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.6.tgz",
"integrity": "sha512-nTnJ528pbqxYanhpDYsi4Rd8MAeaBA67+RZ10CM1m3bTAVFEDcd5AuA4a6W5YkGZ1iNXHzZz8T6TBKLeBuNriQ=="
"version": "4.2.11",
"resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.11.tgz",
"integrity": "sha512-RbJ5/jmFcNNCcDV5o9eTnBLJ/HszWV0P73bc+Ff4nS/rJj+YaS6IGyiOL0VoBYX+l1Wrl3k63h/KrH+nhJ0XvQ=="
},
"inherits": {
"version": "2.0.4",
@ -441,7 +442,7 @@
"jsonfile": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/jsonfile/-/jsonfile-4.0.0.tgz",
"integrity": "sha1-h3Gq4HmbZAdrdmQPygWPnBDjPss=",
"integrity": "sha512-m6F1R3z8jjlf2imQHS2Qez5sjKWQzbuuhuJ/FKYFRZvPE3PuHcSMVZzfsLhGVOkfd20obL5SWEBew5ShlquNxg==",
"requires": {
"graceful-fs": "^4.1.6"
}
@ -452,21 +453,21 @@
"integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg=="
},
"log4js": {
"version": "6.3.0",
"resolved": "https://registry.npmjs.org/log4js/-/log4js-6.3.0.tgz",
"integrity": "sha512-Mc8jNuSFImQUIateBFwdOQcmC6Q5maU0VVvdC2R6XMb66/VnT+7WS4D/0EeNMZu1YODmJe5NIn2XftCzEocUgw==",
"version": "6.9.1",
"resolved": "https://registry.npmjs.org/log4js/-/log4js-6.9.1.tgz",
"integrity": "sha512-1somDdy9sChrr9/f4UlzhdaGfDR2c/SaD2a4T7qEkG4jTS57/B3qmnjLYePwQ8cqWnUHZI0iAKxMBpCZICiZ2g==",
"requires": {
"date-format": "^3.0.0",
"debug": "^4.1.1",
"flatted": "^2.0.1",
"rfdc": "^1.1.4",
"streamroller": "^2.2.4"
"date-format": "^4.0.14",
"debug": "^4.3.4",
"flatted": "^3.2.7",
"rfdc": "^1.3.0",
"streamroller": "^3.1.5"
}
},
"minimist": {
"version": "1.2.5",
"resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.5.tgz",
"integrity": "sha512-FM9nNUYrRBAELZQT3xeZQ7fmMOBg6nWNmJKTcgsJeaLstP/UODVpGsr5OhXhhXg6f+qtJ8uiZ+PUxkDWcgIXLw=="
"version": "1.2.8",
"resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.8.tgz",
"integrity": "sha512-2yyAR8qBkN3YuheJanUpWC5U3bb5osDywNB8RzDVlDwDHbocAJveqqj1u8+SVD7jkWT4yvsHCpWqqWqAxb0zCA=="
},
"ms": {
"version": "2.1.2",
@ -523,9 +524,9 @@
}
},
"rfdc": {
"version": "1.2.0",
"resolved": "https://registry.npmjs.org/rfdc/-/rfdc-1.2.0.tgz",
"integrity": "sha512-ijLyszTMmUrXvjSooucVQwimGUk84eRcmCuLV8Xghe3UO85mjUtRAHRyoMM6XtyqbECaXuBWx18La3523sXINA=="
"version": "1.3.0",
"resolved": "https://registry.npmjs.org/rfdc/-/rfdc-1.3.0.tgz",
"integrity": "sha512-V2hovdzFbOi77/WajaSMXk2OLm+xNIeQdMMuB7icj7bk6zi2F8GGAxigcnDFpJHbNyNcgyJDiP+8nOrY5cZGrA=="
},
"safe-buffer": {
"version": "5.1.2",
@ -538,20 +539,13 @@
"integrity": "sha1-R1OT/56RR5rqYtyvDKPRSYOn+0A="
},
"streamroller": {
"version": "2.2.4",
"resolved": "https://registry.npmjs.org/streamroller/-/streamroller-2.2.4.tgz",
"integrity": "sha512-OG79qm3AujAM9ImoqgWEY1xG4HX+Lw+yY6qZj9R1K2mhF5bEmQ849wvrb+4vt4jLMLzwXttJlQbOdPOQVRv7DQ==",
"version": "3.1.5",
"resolved": "https://registry.npmjs.org/streamroller/-/streamroller-3.1.5.tgz",
"integrity": "sha512-KFxaM7XT+irxvdqSP1LGLgNWbYN7ay5owZ3r/8t77p+EtSUAfUgtl7be3xtqtOmGUl9K9YPO2ca8133RlTjvKw==",
"requires": {
"date-format": "^2.1.0",
"debug": "^4.1.1",
"date-format": "^4.0.14",
"debug": "^4.3.4",
"fs-extra": "^8.1.0"
},
"dependencies": {
"date-format": {
"version": "2.1.0",
"resolved": "https://registry.npmjs.org/date-format/-/date-format-2.1.0.tgz",
"integrity": "sha512-bYQuGLeFxhkxNOF3rcMtiZxvCBAquGzZm6oWA1oZ0g2THUzivaRhv8uOhdr19LmoobSOLoIAxeUK2RdbM8IFTA=="
}
}
},
"string_decoder": {

View File

@ -1,6 +1,6 @@
{
"name": "jobd",
"version": "1.11.0",
"version": "1.12.0",
"description": "job queue daemon",
"main": "src/jobd",
"homepage": "https://github.com/gch1p/jobd#readme",
@ -18,7 +18,7 @@
},
"keywords": [],
"author": "Evgeny Zinoviev",
"license": "BSD-2-Clause",
"license": "MIT",
"os": [
"darwin",
"linux"
@ -30,8 +30,8 @@
"columnify": "^1.5.4",
"ini": "^2.0.0",
"lodash": "^4.17.21",
"log4js": "^6.3.0",
"minimist": "^1.2.5",
"log4js": "^6.4.0",
"minimist": "^1.2.8",
"mysql": "^2.18.1",
"promise-mysql": "^5.0.2",
"queue": "^6.0.2"

View File

@ -114,6 +114,7 @@ function initRequestHandler() {
requestHandler.set('run-manual', onRunManual)
requestHandler.set('pause', onPause)
requestHandler.set('continue', onContinue)
requestHandler.set('send-signal', onSendSignal)
}
function usage() {
@ -223,3 +224,25 @@ function onContinue(data, requestNo, connection) {
return 'ok'
}
/**
* @param {object} data
* @return {Promise<*>}
*/
async function onSendSignal(data) {
const {jobs} = data
if (!Array.isArray(jobs))
throw new Error('jobs must be array')
for (let job of jobs) {
validateObjectSchema(job, [
// name // type // required
['id', 'i', true],
['signal', 'i', true],
['target', 's', true],
])
}
return await workers.sendSignals(jobs)
}

View File

@ -109,7 +109,10 @@ async function initApp(appName) {
})
logger = loggerModule.getLogger(appName)
process.title = appName
let processTitle = `${appName}`
if (config.get('name'))
processTitle += ` ${config.get('name')}`
process.title = processTitle
}
function initWorker() {
@ -141,6 +144,7 @@ function initRequestHandler() {
requestHandler.set('poll', onPollRequest)
requestHandler.set('status', onStatus)
requestHandler.set('run-manual', onRunManual)
requestHandler.set('send-signal', onSendSignal)
requestHandler.set('pause', onPause)
requestHandler.set('continue', onContinue)
requestHandler.set('add-target', onAddTarget)
@ -345,6 +349,18 @@ async function onRunManual(data) {
return P
}
async function onSendSignal(data) {
const {jobs: jobToSignalMap} = data
const results = {}
for (const id in jobToSignalMap) {
if (!jobToSignalMap.hasOwnProperty(id))
continue
const signal = jobToSignalMap[id]
results[id] = worker.killJobProcess(id, signal)
}
return results
}
/**
* @param {{targets: string[]}} data
*/

View File

@ -44,6 +44,11 @@ class Worker extends EventEmitter {
* @type {Logger}
*/
this.logger = getLogger('Worker')
/**
* @type {{}}
*/
this.runningProcesses = {}
}
/**
@ -480,6 +485,7 @@ class Worker extends EventEmitter {
cwd,
env
})
this.runningProcesses[id] = process
let stdoutChunks = []
let stderrChunks = []
@ -490,6 +496,8 @@ class Worker extends EventEmitter {
* @param {null|string} signal
*/
(code, signal) => {
delete this.runningProcesses[id]
let stdout = stdoutChunks.join('')
let stderr = stderrChunks.join('')
@ -505,6 +513,7 @@ class Worker extends EventEmitter {
})
process.on('error', (error) => {
delete this.runningProcesses[id]
reject(error)
})
@ -601,6 +610,22 @@ class Worker extends EventEmitter {
}
}
/**
* @param {number} id
* @param {number} signal
* @return {boolean}
*/
killJobProcess(id, signal) {
if (this.runningProcesses[id] !== undefined) {
try {
return this.runningProcesses[id].kill(signal)
} catch (error) {
this.logger.error(`killJobProcess(${id}, ${signal})`, error)
}
}
return false
}
}
module.exports = {

View File

@ -3,6 +3,18 @@ const config = require('./config')
const {getLogger} = require('./logger')
const {RequestMessage, PingMessage} = require('./server')
const MANUAL_CALL_TYPE_RUN = 0
const MANUAL_CALL_TYPE_SIGNALS = 1
function validateManualCallType(type) {
if (![
MANUAL_CALL_TYPE_RUN,
MANUAL_CALL_TYPE_SIGNALS
].includes(type)) {
throw new Error('invalid manual call type')
}
}
class WorkersList {
constructor() {
@ -190,8 +202,9 @@ class WorkersList {
* @param {{id: int, target: string}[]} jobs
* @return {Promise<{jobs: {}, errors: {}}>}
*/
async runManual(jobs) {
this.logger.debug('runManual:', jobs)
async _runManualCall(callType, jobs) {
validateManualCallType(callType)
this.logger.debug(`runManualCall[${callType}]:`, jobs)
const workers = [...this.workers]
@ -211,7 +224,7 @@ class WorkersList {
}
}
this.logger.trace('runManual: targetWorkers:', targetWorkers)
this.logger.trace(`runManualCall[${callType}]: targetWorkers:`, targetWorkers)
/**
* List of job IDs with unsupported targets.
@ -219,10 +232,6 @@ class WorkersList {
* @type {int[]}
*/
const exceptions = []
/**
* @type {object.<int, int[]>}
*/
const callMap = {}
/**
@ -246,11 +255,11 @@ class WorkersList {
if (callMap[workerIndex] === undefined)
callMap[workerIndex] = []
callMap[workerIndex].push(id)
callMap[workerIndex].push(job)
}
this.logger.trace('runManual: callMap:', callMap)
this.logger.trace('runManual: exceptions:', exceptions)
this.logger.trace(`runManualCall[${callType}]: callMap:`, callMap)
this.logger.trace(`runManualCall[${callType}]: exceptions:`, exceptions)
/**
* @type {Promise[]}
@ -266,23 +275,38 @@ class WorkersList {
if (!callMap.hasOwnProperty(workerIndex))
continue
let workerJobIds = callMap[workerIndex]
let workerJobsData = callMap[workerIndex]
let worker = workers[workerIndex]
let conn = worker.connection
let P = conn.sendRequest(
new RequestMessage('run-manual', {ids: workerJobIds})
)
let P
switch (callType) {
case MANUAL_CALL_TYPE_RUN:
P = conn.sendRequest(
new RequestMessage('run-manual', {ids: workerJobsData.map(j => j.id)})
)
break
case MANUAL_CALL_TYPE_SIGNALS:
const data = {}
for (let jobData of workerJobsData)
data[jobData.id] = jobData.signal
P = conn.sendRequest(
new RequestMessage('send-signal', {jobs: data})
)
break
}
promises.push(P)
jobsByPromise.push(workerJobIds)
jobsByPromise.push(workerJobsData.map(j => j.id))
}
this.logger.trace('runManual: jobsByPromise:', jobsByPromise)
this.logger.trace(`runManualCall[${callType}]: jobsByPromise:`, jobsByPromise)
const results = await Promise.allSettled(promises)
this.logger.trace('runManual: Promise.allSettled results:', results)
this.logger.trace(`runManualCall[${callType}]: Promise.allSettled results:`, results)
const response = {}
const setError = (id, value) => {
@ -314,14 +338,24 @@ class WorkersList {
*/
const responseMessage = result.value
const {jobs, errors} = responseMessage.data
this.logger.trace(`[${i}]:`, jobs, errors)
switch (callType) {
case MANUAL_CALL_TYPE_RUN:
const {jobs, errors} = responseMessage.data
this.logger.trace(`[${i}]:`, jobs, errors)
if (jobs)
setData(jobs)
if (jobs)
setData(jobs)
if (errors)
setError(errors)
break
case MANUAL_CALL_TYPE_SIGNALS:
Object.assign(response, responseMessage.data)
break
}
if (errors)
setError(errors)
} else if (result.status === 'rejected') {
for (let jobIds of jobsByPromise[i]) {
@ -340,6 +374,14 @@ class WorkersList {
return response
}
async runManual(jobs) {
return await this._runManualCall(MANUAL_CALL_TYPE_RUN, jobs)
}
async sendSignals(jobs) {
return await this._runManualCall(MANUAL_CALL_TYPE_SIGNALS, jobs)
}
/**
* @param {null|string[]} targets
*/