jobd: add set-target-concurrency request

This commit is contained in:
Evgeny Zinoviev 2021-03-05 03:01:06 +03:00
parent a191c5c82c
commit 613fd5fd3e
3 changed files with 43 additions and 1 deletions

View File

@ -67,6 +67,9 @@ for target is not enough for you, change it to fit your needs.
* **`run-manual(ids: int[])`** — enqueue and run jobs with specified IDs and
`status` set to `manual`, and return results.
* **`set-target-concurrency(target: string, concurrency: int)`** — set concurrency
of target `target`.
### jobd-master requests
* **`register-worker(targets: string[])`** — used by a jobd instance to register

View File

@ -5,7 +5,7 @@ const config = require('./lib/config')
const db = require('./lib/db')
const {uniq} = require('lodash')
const {createCallablePromise} = require('./lib/util')
const {validateTargetsListFormat} = require('./lib/data-validator')
const {validateObjectSchema, validateTargetsListFormat} = require('./lib/data-validator')
const RequestHandler = require('./lib/request-handler')
const {
Server,
@ -136,6 +136,7 @@ function initRequestHandler() {
requestHandler.set('run-manual', onRunManual)
requestHandler.set('pause', onPause)
requestHandler.set('continue', onContinue)
requestHandler.set('set-target-concurrency', onSetTargetConcurrency)
}
function initServer() {
@ -324,6 +325,33 @@ function onContinue(data, requestNo, connection) {
)
}
/**
* @param {object} data
* @param {number} requestNo
* @param {Connection} connection
*/
function onSetTargetConcurrency(data, requestNo, connection) {
try {
validateObjectSchema(data, [
// name // type // required
['concurrency', 'i', true],
['target', 's', true],
])
} catch (e) {
connection.send(
new ResponseMessage(requestNo)
.setError(e.message)
)
return
}
worker.setTargetConcurrency(data.target, data.concurrency)
connection.send(
new ResponseMessage(requestNo)
.setData('ok')
)
}
/**
* @private
* @param data

View File

@ -72,6 +72,17 @@ class Worker extends EventEmitter {
}
}
/**
* @param {string} target
* @param {number} concurrency
*/
setTargetConcurrency(target, concurrency) {
if (!(target in this.targets))
throw new Error(`target '${target}' not found`)
this.targets[target].queue.concurrency = concurrency
}
/**
* Stop queues associated with specified targets.
*