jobd-master: support pause()/continue()

This commit is contained in:
Evgeny Zinoviev 2021-03-03 02:16:38 +03:00
parent c497fd50e8
commit 03cda643ad
2 changed files with 121 additions and 3 deletions

View File

@ -104,6 +104,8 @@ function initRequestHandler() {
requestHandler.set('register-worker', onRegisterWorker)
requestHandler.set('status', onStatus)
requestHandler.set('run-manual', onRunManual)
requestHandler.set('pause', onPause)
requestHandler.set('continue', onContinue)
}
/**
@ -221,6 +223,73 @@ async function onRunManual(data, requestNo, connection) {
)
}
/**
* @param {object} data
* @param {number} requestNo
* @param {Connection} connection
*/
function onPause(data, requestNo, connection) {
let targets
if ((targets = validateInputTargets(data, requestNo, connection)) === false)
return
workers.pauseTargets(targets)
connection.send(
new ResponseMessage(requestNo)
.setData('ok')
)
}
/**
* @param {object} data
* @param {number} requestNo
* @param {Connection} connection
*/
function onContinue(data, requestNo, connection) {
let targets
if ((targets = validateInputTargets(data, requestNo, connection)) === false)
return
workers.continueTargets(targets)
connection.send(
new ResponseMessage(requestNo)
.setData('ok')
)
}
/**
* @private
* @param data
* @param requestNo
* @param connection
* @return {null|boolean|string[]}
*/
function validateInputTargets(data, requestNo, connection) {
// null means all targets
let targets = null
if (data.targets !== undefined) {
targets = data.targets
// validate data
try {
validateTargetsListFormat(targets)
// note: we don't check target names here
// as in jobd
} catch (e) {
connection.send(
new ResponseMessage(requestNo)
.setError(e.message)
)
return false
}
}
return targets
}
function usage() {
let s = `${process.argv[1]} OPTIONS

View File

@ -67,15 +67,26 @@ class WorkersList {
poke(targets) {
this.logger.debug('poke:', targets)
if (!Array.isArray(targets))
throw new Error('targets must be Array')
for (let t of targets)
this.targetsToPoke[t] = true
this._pokeWorkers()
}
/**
* @param targets
* @return {object[]}
*/
getWorkersByTargets(targets) {
const found = []
for (const worker of this.workers) {
const intrs = intersection(worker.targets, targets)
if (intrs.length > 0)
found.push(worker)
}
return found
}
/**
* @private
*/
@ -327,6 +338,40 @@ class WorkersList {
return response
}
/**
* @param {null|string[]} targets
*/
pauseTargets(targets) {
return this._pauseContinueWorkers('pause', targets)
}
/**
* @param {null|string[]} targets
*/
continueTargets(targets) {
return this._pauseContinueWorkers('continue', targets)
}
/**
* @param {string} action
* @param {null|string[]} targets
* @private
*/
_pauseContinueWorkers(action, targets) {
(targets === null ? this.workers : this.getWorkersByTargets(targets))
.map(worker => {
this.logger.debug(`${action}Targets: sending ${action} request to ${worker.connection.remoteAddr()}`)
let data = {}
if (targets !== null)
data.targets = intersection(worker.targets, targets)
worker.connection.sendRequest(
new RequestMessage(action, data)
).catch(this.onWorkerRequestError.bind(this, `${action}Targets`))
})
}
/**
* @private
*/
@ -338,6 +383,10 @@ class WorkersList {
})
}
onWorkerRequestError = (from, error) => {
this.logger.error(`${from}:`, error)
}
}
module.exports = WorkersList