jobd: add add-target()/remove-target(); code refactoring

This commit is contained in:
Evgeny Zinoviev 2021-03-16 01:06:14 +03:00
parent cbbe60df32
commit 23c16a2c80
8 changed files with 352 additions and 427 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
.idea
node_modules
Inspections.xml

View File

@ -132,6 +132,10 @@ Here is the list of supported requests, using `type(arguments)` notation.
* **`run-manual(ids: int[])`** — enqueue and run jobs with specified IDs and
`status` set to `manual`, and return results.
* **`add-target(target: string, concurrency: int)`** — add target
* **`remove-target(target: string, concurrency: int)`** — remove target
* **`set-target-concurrency(target: string, concurrency: int)`** — set concurrency
of target `target`.

View File

@ -4,8 +4,12 @@ const loggerModule = require('./lib/logger')
const config = require('./lib/config')
const {Server, ResponseMessage} = require('./lib/server')
const WorkersList = require('./lib/workers-list')
const {validateObjectSchema, validateTargetsListFormat} = require('./lib/data-validator')
const RequestHandler = require('./lib/request-handler')
const {
validateObjectSchema,
validateInputTargetsListFormat,
validateInputTargets
} = require('./lib/data-validator')
const {RequestHandler} = require('./lib/request-handler')
const package_json = require('../package.json')
const DEFAULT_CONFIG_PATH = "/etc/jobd-master.conf"
@ -112,189 +116,6 @@ function initRequestHandler() {
requestHandler.set('continue', onContinue)
}
/**
* @param {object} data
* @param {number} requestNo
* @param {Connection} connection
*/
function onRegisterWorker(data, requestNo, connection) {
const targets = data.targets || []
// validate data
try {
validateTargetsListFormat(targets)
} catch (e) {
connection.send(
new ResponseMessage(requestNo)
.setError(e.message)
)
return
}
// register worker and reply with OK
workers.add(connection, targets)
connection.send(
new ResponseMessage(requestNo)
.setData('ok')
)
}
/**
* @param {object} data
* @param {number} requestNo
* @param {Connection} connection
*/
function onPoke(data, requestNo, connection) {
const targets = data.targets || []
// validate data
try {
validateTargetsListFormat(targets)
} catch (e) {
connection.send(
new ResponseMessage(requestNo)
.setError(e.message)
)
return
}
// poke workers
workers.poke(targets)
// reply to user
connection.send(
new ResponseMessage(requestNo)
.setData('ok')
)
}
/**
* @param {object} data
* @param {number} requestNo
* @param {Connection} connection
* @return {Promise<*>}
*/
async function onStatus(data, requestNo, connection) {
const info = await workers.getInfo(data.poll_workers || false)
let status = {
workers: info,
memoryUsage: process.memoryUsage()
}
connection.send(
new ResponseMessage(requestNo)
.setData(status)
)
}
/**
* @param {object} data
* @param {number} requestNo
* @param {Connection} connection
* @return {Promise<*>}
*/
async function onRunManual(data, requestNo, connection) {
const {jobs} = data
// validate data
try {
if (!Array.isArray(jobs))
throw new Error('jobs must be array')
for (let job of jobs) {
validateObjectSchema(job, [
// name // type // required
['id', 'i', true],
['target', 's', true],
])
}
} catch (e) {
connection.send(
new ResponseMessage(requestNo)
.setError(e.message)
)
return
}
// run jobs on workers
const jobsData = await workers.runManual(jobs)
// send result to the client
connection.send(
new ResponseMessage(requestNo)
.setData(jobsData)
)
}
/**
* @param {object} data
* @param {number} requestNo
* @param {Connection} connection
*/
function onPause(data, requestNo, connection) {
let targets
if ((targets = validateInputTargets(data, requestNo, connection)) === false)
return
workers.pauseTargets(targets)
connection.send(
new ResponseMessage(requestNo)
.setData('ok')
)
}
/**
* @param {object} data
* @param {number} requestNo
* @param {Connection} connection
*/
function onContinue(data, requestNo, connection) {
let targets
if ((targets = validateInputTargets(data, requestNo, connection)) === false)
return
workers.continueTargets(targets)
connection.send(
new ResponseMessage(requestNo)
.setData('ok')
)
}
/**
* @private
* @param data
* @param requestNo
* @param connection
* @return {null|boolean|string[]}
*/
function validateInputTargets(data, requestNo, connection) {
// null means all targets
let targets = null
if (data.targets !== undefined) {
targets = data.targets
// validate data
try {
validateTargetsListFormat(targets)
// note: we don't check target names here
// as in jobd
} catch (e) {
connection.send(
new ResponseMessage(requestNo)
.setError(e.message)
)
return false
}
}
return targets
}
function usage() {
let s = `${process.argv[1]} OPTIONS
@ -313,3 +134,86 @@ async function term() {
await loggerModule.shutdown()
process.exit()
}
/****************************************/
/** **/
/** Request handlers **/
/** **/
/****************************************/
/**
* @param {object} data
* @param {Connection} connection
*/
async function onRegisterWorker(data, connection) {
const targets = validateInputTargets(data, null)
workers.add(connection, targets)
return 'ok'
}
/**
* @param {object} data
*/
async function onPoke(data) {
const targets = validateInputTargets(data, null)
workers.poke(targets)
return 'ok'
}
/**
* @param {object} data
* @return {Promise<*>}
*/
async function onStatus(data) {
const info = await workers.getInfo(data.poll_workers || false)
return {
workers: info,
memoryUsage: process.memoryUsage()
}
}
/**
* @param {object} data
* @return {Promise<*>}
*/
async function onRunManual(data) {
const {jobs} = data
// validate input
if (!Array.isArray(jobs))
throw new Error('jobs must be array')
for (let job of jobs) {
validateObjectSchema(job, [
// name // type // required
['id', 'i', true],
['target', 's', true],
])
}
// run jobs, wait for results and send a response
return await workers.runManual(jobs)
}
/**
* @param {object} data
*/
function onPause(data) {
const targets = validateInputTargets(data, null)
workers.pauseTargets(targets)
return 'ok'
}
/**
* @param {object} data
* @param {number} requestNo
* @param {Connection} connection
*/
function onContinue(data, requestNo, connection) {
const targets = validateInputTargets(data, null)
workers.continueTargets(targets)
return 'ok'
}

View File

@ -5,8 +5,11 @@ const config = require('./lib/config')
const db = require('./lib/db')
const {uniq} = require('lodash')
const {createCallablePromise} = require('./lib/util')
const {validateObjectSchema, validateTargetsListFormat} = require('./lib/data-validator')
const RequestHandler = require('./lib/request-handler')
const {
validateInputTargetAndConcurrency,
validateInputTargets
} = require('./lib/data-validator')
const {RequestHandler} = require('./lib/request-handler')
const {
Server,
Connection,
@ -139,6 +142,8 @@ function initRequestHandler() {
requestHandler.set('run-manual', onRunManual)
requestHandler.set('pause', onPause)
requestHandler.set('continue', onContinue)
requestHandler.set('add-target', onAddTarget)
requestHandler.set('remove-target', onRemoveTarget)
requestHandler.set('set-target-concurrency', onSetTargetConcurrency)
}
@ -162,236 +167,6 @@ async function initDatabase() {
logger.info('db initialized')
}
/**
* @param {object} data
* @param {number} requestNo
* @param {Connection} connection
*/
function onPollRequest(data, requestNo, connection) {
let targets
if ((targets = validateInputTargets(data, requestNo, connection)) === false)
return
worker.setPollTargets(targets)
worker.poll()
connection.send(
new ResponseMessage(requestNo)
.setData('ok')
)
}
/**
* @param {object} data
* @param {number} requestNo
* @param {Connection} connection
*/
function onStatus(data, requestNo, connection) {
connection.send(
new ResponseMessage(requestNo)
.setData({
targets: worker.getStatus(),
jobPromisesCount: Object.keys(jobPromises).length,
memoryUsage: process.memoryUsage()
})
)
}
/**
* @param {object} data
* @param {number} requestNo
* @param {Connection} connection
* @return {Promise<void>}
*/
async function onRunManual(data, requestNo, connection) {
let {ids: jobIds} = data
jobIds = uniq(jobIds)
// if at least one of the jobs is already being run, reject
// if at least one item is not a number, reject
for (const id of jobIds) {
if (typeof id !== 'number') {
connection.send(
new ResponseMessage(requestNo)
.setError(`all ids must be numbers, got ${typeof id}`)
)
return
}
if (id in jobPromises) {
connection.send(
new ResponseMessage(requestNo)
.setError(`another client is already waiting for job ${id}`)
)
return
}
}
// create a bunch of promises, one per job
let promises = []
for (const id of jobIds) {
const P = createCallablePromise()
jobPromises[id] = P
promises.push(P)
}
// get jobs from database and enqueue for execution
const {results} = await worker.getTasks(null, STATUS_MANUAL, {ids: jobIds})
// wait till all jobs are done (or failed), then send a response
Promise.allSettled(promises).then(results => {
const response = {}
for (let i = 0; i < results.length; i++) {
let jobId = jobIds[i]
let result = results[i]
if (result.status === 'fulfilled') {
if (!('jobs' in response))
response.jobs = {}
if (result.value?.id !== undefined)
delete result.value.id
response.jobs[jobId] = result.value
} else if (result.status === 'rejected') {
if (!('errors' in response))
response.errors = {}
response.errors[jobId] = result.reason?.message
}
}
connection.send(
new ResponseMessage(requestNo)
.setData(response)
)
})
// reject all ignored / non-found jobs
for (const [id, value] of results.entries()) {
if (!(id in jobPromises)) {
this.logger.error(`run-manual: ${id} not found in jobPromises`)
continue
}
if (value.result === JOB_IGNORED || value.result === JOB_NOTFOUND) {
const P = jobPromises[id]
delete jobPromises[id]
if (value.result === JOB_IGNORED)
P.reject(new Error(value.reason))
else if (value.result === JOB_NOTFOUND)
P.reject(new Error(`job ${id} not found`))
}
}
}
/**
* @param {object} data
* @param {number} requestNo
* @param {Connection} connection
*/
function onPause(data, requestNo, connection) {
let targets
if ((targets = validateInputTargets(data, requestNo, connection)) === false)
return
worker.pauseTargets(targets)
connection.send(
new ResponseMessage(requestNo)
.setData('ok')
)
}
/**
* @param {object} data
* @param {number} requestNo
* @param {Connection} connection
*/
function onContinue(data, requestNo, connection) {
let targets
if ((targets = validateInputTargets(data, requestNo, connection)) === false)
return
// continue queues
worker.continueTargets(targets)
// poll just in case
worker.poll()
// ok
connection.send(
new ResponseMessage(requestNo)
.setData('ok')
)
}
/**
* @param {object} data
* @param {number} requestNo
* @param {Connection} connection
*/
function onSetTargetConcurrency(data, requestNo, connection) {
try {
validateObjectSchema(data, [
// name // type // required
['concurrency', 'i', true],
['target', 's', true],
])
if (data.concurrency <= 0)
throw new Error('Invalid concurrency value.')
} catch (e) {
connection.send(
new ResponseMessage(requestNo)
.setError(e.message)
)
return
}
worker.setTargetConcurrency(data.target, data.concurrency)
connection.send(
new ResponseMessage(requestNo)
.setData('ok')
)
}
/**
* @private
* @param data
* @param requestNo
* @param connection
* @return {null|boolean|string[]}
*/
function validateInputTargets(data, requestNo, connection) {
// null means all targets
let targets = null
if (data.targets !== undefined) {
targets = data.targets
// validate data
try {
validateTargetsListFormat(targets)
for (const t of targets) {
if (!worker.hasTarget(t))
throw new Error(`invalid target '${t}'`)
}
} catch (e) {
connection.send(
new ResponseMessage(requestNo)
.setError(e.message)
)
return false
}
}
return targets
}
function connectToMaster() {
const port = config.get('master_port')
const host = config.get('master_host')
@ -455,3 +230,167 @@ async function term() {
await loggerModule.shutdown()
process.exit()
}
/****************************************/
/** **/
/** Request handlers **/
/** **/
/****************************************/
/**
* @param {object} data
* @return {Promise<string>}
*/
async function onPollRequest(data) {
let targets = validateInputTargets(data, worker)
worker.setPollTargets(targets)
worker.poll()
return 'ok'
}
/**
* @param {object} data
* @return {Promise<object>}
*/
async function onStatus(data) {
return {
targets: worker.getStatus(),
jobPromisesCount: Object.keys(jobPromises).length,
memoryUsage: process.memoryUsage()
}
}
/**
* @param {{ids: number[]}} data
* @return {Promise}
*/
async function onRunManual(data) {
let {ids: jobIds} = data
jobIds = uniq(jobIds)
for (const id of jobIds) {
// if at least one item is not a number, reject
if (typeof id !== 'number')
throw new Error(`all ids must be numbers, got ${typeof id}`)
// if at least one of the jobs is already being run, reject
if (id in jobPromises)
throw new Error(`another client is already waiting for job ${id}`)
}
// create a bunch of promises, one per job
let promises = []
for (const id of jobIds) {
const P = createCallablePromise()
jobPromises[id] = P
promises.push(P)
}
// get jobs from database and enqueue for execution
const {results} = await worker.getTasks(null, STATUS_MANUAL, {ids: jobIds})
// wait till all jobs are done (or failed), then send a response
const P = Promise.allSettled(promises).then(results => {
const response = {}
for (let i = 0; i < results.length; i++) {
let jobId = jobIds[i]
let result = results[i]
if (result.status === 'fulfilled') {
if (!('jobs' in response))
response.jobs = {}
if (result.value?.id !== undefined)
delete result.value.id
response.jobs[jobId] = result.value
} else if (result.status === 'rejected') {
if (!('errors' in response))
response.errors = {}
response.errors[jobId] = result.reason?.message
}
}
return response
})
// reject all ignored / non-found jobs
for (const [id, value] of results.entries()) {
if (!(id in jobPromises)) {
this.logger.error(`run-manual: ${id} not found in jobPromises`)
continue
}
if (value.result === JOB_IGNORED || value.result === JOB_NOTFOUND) {
const P = jobPromises[id]
delete jobPromises[id]
if (value.result === JOB_IGNORED)
P.reject(new Error(value.reason))
else if (value.result === JOB_NOTFOUND)
P.reject(new Error(`job ${id} not found`))
}
}
return P
}
/**
* @param {{targets: string[]}} data
*/
async function onPause(data) {
let targets = validateInputTargets(data, worker)
worker.pauseTargets(targets)
return 'ok'
}
/**
* @param {{targets: string[]}} data
*/
async function onContinue(data) {
let targets
if ((targets = validateInputTargets(data, worker)) === false)
return
// continue queues
worker.continueTargets(targets)
// poll just in case
worker.poll()
return 'ok'
}
/**
* @param {{target: string, concurrency: int}} data
*/
async function onAddTarget(data) {
validateInputTargetAndConcurrency(data)
worker.addTarget(data.target, data.concurrency)
return 'ok'
}
/**
* @param {{target: string}} data
*/
async function onRemoveTarget(data) {
validateInputTargetAndConcurrency(data, true)
worker.removeTarget(data.target)
return 'ok'
}
/**
* @param {object} data
*/
async function onSetTargetConcurrency(data) {
validateInputTargetAndConcurrency(data)
worker.setTargetConcurrency(data.target, data.concurrency)
return 'ok'
}

View File

@ -11,6 +11,11 @@ const typeNames = {
const logger = getLogger('data-validator')
/**************************************/
/** Common Functions **/
/**************************************/
/**
* @param {string} expectedType
* @param value
@ -69,7 +74,12 @@ function validateObjectSchema(data, schema) {
}
}
function validateTargetsListFormat(targets) {
/********************************************/
/** Request input data validators */
/********************************************/
function validateInputTargetsListFormat(targets) {
if (!Array.isArray(targets))
throw new Error('targets must be array')
@ -83,7 +93,51 @@ function validateTargetsListFormat(targets) {
}
}
function validateInputTargetAndConcurrency(data, onlyTarget = false) {
const schema = [
['target', 's', true],
]
if (!onlyTarget) {
schema.push(
['concurrency', 'i', true]
)
}
validateObjectSchema(data, schema)
if (!onlyTarget && data.concurrency <= 0)
throw new Error('Invalid concurrency value.')
}
/**
* @param data
* @param {Worker|null} worker
* @return {null|string[]}
*/
function validateInputTargets(data, worker) {
// null means all targets
let targets = null
if (data.targets !== undefined) {
targets = data.targets
validateInputTargetsListFormat(targets)
if (worker !== null) {
for (const t of targets) {
if (!worker.hasTarget(t))
throw new Error(`invalid target '${t}'`)
}
}
}
return targets
}
module.exports = {
validateObjectSchema,
validateTargetsListFormat
validateInputTargetsListFormat,
validateInputTargetAndConcurrency,
validateInputTargets,
}

View File

@ -101,10 +101,9 @@ module.exports = {
},
/**
* @param cb
* @return {Promise}
*/
shutdown(cb) {
shutdown() {
return new Promise((resolve, reject) => {
log4js.shutdown(resolve)
})

View File

@ -35,14 +35,17 @@ class RequestHandler {
if (this.handlers.has(message.requestType)) {
const f = this.handlers.get(message.requestType)
const result = f(message.requestData || {}, message.requestNo, connection)
const result = f(message.requestData || {}, connection)
if (result instanceof Promise) {
result.catch(error => {
this.logger.error(`${message.requestType}:`, error)
result.then(data => {
connection.send(
new ResponseMessage(message.requestNo)
.setError('server error: ' + error?.message)
.setData(data)
)
}).catch(error => {
connection.send(
new ResponseMessage(message.requestNo)
.setError(error?.message)
)
})
}
@ -56,4 +59,6 @@ class RequestHandler {
}
module.exports = RequestHandler
module.exports = {
RequestHandler
}

View File

@ -72,6 +72,25 @@ class Worker extends EventEmitter {
}
}
/**
* Deletes a queue.
*
* @param {string} target
*/
removeTarget(target) {
if (!(target in this.targets))
throw new Error(`target '${target}' not found`)
const {queue} = this.targets[target]
if (queue.length > 0)
throw new Error(`queue is not empty`)
this.logger.debug(`deleteTarget: deleting target' ${target}'`)
queue.removeAllListeners()
queue.end()
delete this.targets[target]
}
/**
* @param {string} target
* @param {number} concurrency