From 23c16a2c80f0614d0b31cba363bca66e1a60687b Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Tue, 16 Mar 2021 01:06:14 +0300 Subject: [PATCH] jobd: add add-target()/remove-target(); code refactoring --- .gitignore | 1 + README.md | 4 + src/jobd-master.js | 274 ++++++++----------------- src/jobd.js | 403 ++++++++++++++++--------------------- src/lib/data-validator.js | 58 +++++- src/lib/logger.js | 3 +- src/lib/request-handler.js | 17 +- src/lib/worker.js | 19 ++ 8 files changed, 352 insertions(+), 427 deletions(-) diff --git a/.gitignore b/.gitignore index 7a1537b..1bc0523 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .idea node_modules +Inspections.xml \ No newline at end of file diff --git a/README.md b/README.md index 9a4bc6f..a158235 100644 --- a/README.md +++ b/README.md @@ -132,6 +132,10 @@ Here is the list of supported requests, using `type(arguments)` notation. * **`run-manual(ids: int[])`** — enqueue and run jobs with specified IDs and `status` set to `manual`, and return results. +* **`add-target(target: string, concurrency: int)`** — add target + +* **`remove-target(target: string, concurrency: int)`** — remove target + * **`set-target-concurrency(target: string, concurrency: int)`** — set concurrency of target `target`. diff --git a/src/jobd-master.js b/src/jobd-master.js index 9827126..eeac085 100755 --- a/src/jobd-master.js +++ b/src/jobd-master.js @@ -4,8 +4,12 @@ const loggerModule = require('./lib/logger') const config = require('./lib/config') const {Server, ResponseMessage} = require('./lib/server') const WorkersList = require('./lib/workers-list') -const {validateObjectSchema, validateTargetsListFormat} = require('./lib/data-validator') -const RequestHandler = require('./lib/request-handler') +const { + validateObjectSchema, + validateInputTargetsListFormat, + validateInputTargets +} = require('./lib/data-validator') +const {RequestHandler} = require('./lib/request-handler') const package_json = require('../package.json') const DEFAULT_CONFIG_PATH = "/etc/jobd-master.conf" @@ -112,189 +116,6 @@ function initRequestHandler() { requestHandler.set('continue', onContinue) } -/** - * @param {object} data - * @param {number} requestNo - * @param {Connection} connection - */ -function onRegisterWorker(data, requestNo, connection) { - const targets = data.targets || [] - - // validate data - try { - validateTargetsListFormat(targets) - } catch (e) { - connection.send( - new ResponseMessage(requestNo) - .setError(e.message) - ) - return - } - - // register worker and reply with OK - workers.add(connection, targets) - connection.send( - new ResponseMessage(requestNo) - .setData('ok') - ) -} - -/** - * @param {object} data - * @param {number} requestNo - * @param {Connection} connection - */ -function onPoke(data, requestNo, connection) { - const targets = data.targets || [] - - // validate data - try { - validateTargetsListFormat(targets) - } catch (e) { - connection.send( - new ResponseMessage(requestNo) - .setError(e.message) - ) - return - } - - // poke workers - workers.poke(targets) - - // reply to user - connection.send( - new ResponseMessage(requestNo) - .setData('ok') - ) -} - -/** - * @param {object} data - * @param {number} requestNo - * @param {Connection} connection - * @return {Promise<*>} - */ -async function onStatus(data, requestNo, connection) { - const info = await workers.getInfo(data.poll_workers || false) - - let status = { - workers: info, - memoryUsage: process.memoryUsage() - } - - connection.send( - new ResponseMessage(requestNo) - .setData(status) - ) -} - -/** - * @param {object} data - * @param {number} requestNo - * @param {Connection} connection - * @return {Promise<*>} - */ -async function onRunManual(data, requestNo, connection) { - const {jobs} = data - - // validate data - try { - if (!Array.isArray(jobs)) - throw new Error('jobs must be array') - - for (let job of jobs) { - validateObjectSchema(job, [ - // name // type // required - ['id', 'i', true], - ['target', 's', true], - ]) - } - } catch (e) { - connection.send( - new ResponseMessage(requestNo) - .setError(e.message) - ) - return - } - - // run jobs on workers - const jobsData = await workers.runManual(jobs) - - // send result to the client - connection.send( - new ResponseMessage(requestNo) - .setData(jobsData) - ) -} - -/** - * @param {object} data - * @param {number} requestNo - * @param {Connection} connection - */ -function onPause(data, requestNo, connection) { - let targets - if ((targets = validateInputTargets(data, requestNo, connection)) === false) - return - - workers.pauseTargets(targets) - connection.send( - new ResponseMessage(requestNo) - .setData('ok') - ) -} - -/** - * @param {object} data - * @param {number} requestNo - * @param {Connection} connection - */ -function onContinue(data, requestNo, connection) { - let targets - if ((targets = validateInputTargets(data, requestNo, connection)) === false) - return - - workers.continueTargets(targets) - connection.send( - new ResponseMessage(requestNo) - .setData('ok') - ) -} - - -/** - * @private - * @param data - * @param requestNo - * @param connection - * @return {null|boolean|string[]} - */ -function validateInputTargets(data, requestNo, connection) { - // null means all targets - let targets = null - - if (data.targets !== undefined) { - targets = data.targets - - // validate data - try { - validateTargetsListFormat(targets) - - // note: we don't check target names here - // as in jobd - } catch (e) { - connection.send( - new ResponseMessage(requestNo) - .setError(e.message) - ) - return false - } - } - - return targets -} - - function usage() { let s = `${process.argv[1]} OPTIONS @@ -313,3 +134,86 @@ async function term() { await loggerModule.shutdown() process.exit() } + + + +/****************************************/ +/** **/ +/** Request handlers **/ +/** **/ +/****************************************/ + +/** + * @param {object} data + * @param {Connection} connection + */ +async function onRegisterWorker(data, connection) { + const targets = validateInputTargets(data, null) + workers.add(connection, targets) + return 'ok' +} + +/** + * @param {object} data + */ +async function onPoke(data) { + const targets = validateInputTargets(data, null) + workers.poke(targets) + return 'ok' +} + +/** + * @param {object} data + * @return {Promise<*>} + */ +async function onStatus(data) { + const info = await workers.getInfo(data.poll_workers || false) + return { + workers: info, + memoryUsage: process.memoryUsage() + } +} + +/** + * @param {object} data + * @return {Promise<*>} + */ +async function onRunManual(data) { + const {jobs} = data + + // validate input + if (!Array.isArray(jobs)) + throw new Error('jobs must be array') + + for (let job of jobs) { + validateObjectSchema(job, [ + // name // type // required + ['id', 'i', true], + ['target', 's', true], + ]) + } + + // run jobs, wait for results and send a response + return await workers.runManual(jobs) +} + +/** + * @param {object} data + */ +function onPause(data) { + const targets = validateInputTargets(data, null) + workers.pauseTargets(targets) + return 'ok' +} + +/** + * @param {object} data + * @param {number} requestNo + * @param {Connection} connection + */ +function onContinue(data, requestNo, connection) { + const targets = validateInputTargets(data, null) + workers.continueTargets(targets) + return 'ok' +} + diff --git a/src/jobd.js b/src/jobd.js index e1331d1..bb912fc 100755 --- a/src/jobd.js +++ b/src/jobd.js @@ -5,8 +5,11 @@ const config = require('./lib/config') const db = require('./lib/db') const {uniq} = require('lodash') const {createCallablePromise} = require('./lib/util') -const {validateObjectSchema, validateTargetsListFormat} = require('./lib/data-validator') -const RequestHandler = require('./lib/request-handler') +const { + validateInputTargetAndConcurrency, + validateInputTargets +} = require('./lib/data-validator') +const {RequestHandler} = require('./lib/request-handler') const { Server, Connection, @@ -139,6 +142,8 @@ function initRequestHandler() { requestHandler.set('run-manual', onRunManual) requestHandler.set('pause', onPause) requestHandler.set('continue', onContinue) + requestHandler.set('add-target', onAddTarget) + requestHandler.set('remove-target', onRemoveTarget) requestHandler.set('set-target-concurrency', onSetTargetConcurrency) } @@ -162,236 +167,6 @@ async function initDatabase() { logger.info('db initialized') } -/** - * @param {object} data - * @param {number} requestNo - * @param {Connection} connection - */ -function onPollRequest(data, requestNo, connection) { - let targets - if ((targets = validateInputTargets(data, requestNo, connection)) === false) - return - - worker.setPollTargets(targets) - worker.poll() - - connection.send( - new ResponseMessage(requestNo) - .setData('ok') - ) -} - -/** - * @param {object} data - * @param {number} requestNo - * @param {Connection} connection - */ -function onStatus(data, requestNo, connection) { - connection.send( - new ResponseMessage(requestNo) - .setData({ - targets: worker.getStatus(), - jobPromisesCount: Object.keys(jobPromises).length, - memoryUsage: process.memoryUsage() - }) - ) -} - -/** - * @param {object} data - * @param {number} requestNo - * @param {Connection} connection - * @return {Promise} - */ -async function onRunManual(data, requestNo, connection) { - let {ids: jobIds} = data - jobIds = uniq(jobIds) - - // if at least one of the jobs is already being run, reject - // if at least one item is not a number, reject - for (const id of jobIds) { - if (typeof id !== 'number') { - connection.send( - new ResponseMessage(requestNo) - .setError(`all ids must be numbers, got ${typeof id}`) - ) - return - } - - if (id in jobPromises) { - connection.send( - new ResponseMessage(requestNo) - .setError(`another client is already waiting for job ${id}`) - ) - return - } - } - - // create a bunch of promises, one per job - let promises = [] - for (const id of jobIds) { - const P = createCallablePromise() - jobPromises[id] = P - promises.push(P) - } - - // get jobs from database and enqueue for execution - const {results} = await worker.getTasks(null, STATUS_MANUAL, {ids: jobIds}) - - // wait till all jobs are done (or failed), then send a response - Promise.allSettled(promises).then(results => { - const response = {} - - for (let i = 0; i < results.length; i++) { - let jobId = jobIds[i] - let result = results[i] - - if (result.status === 'fulfilled') { - if (!('jobs' in response)) - response.jobs = {} - - if (result.value?.id !== undefined) - delete result.value.id - - response.jobs[jobId] = result.value - } else if (result.status === 'rejected') { - if (!('errors' in response)) - response.errors = {} - - response.errors[jobId] = result.reason?.message - } - } - - connection.send( - new ResponseMessage(requestNo) - .setData(response) - ) - }) - - // reject all ignored / non-found jobs - for (const [id, value] of results.entries()) { - if (!(id in jobPromises)) { - this.logger.error(`run-manual: ${id} not found in jobPromises`) - continue - } - - if (value.result === JOB_IGNORED || value.result === JOB_NOTFOUND) { - const P = jobPromises[id] - delete jobPromises[id] - - if (value.result === JOB_IGNORED) - P.reject(new Error(value.reason)) - - else if (value.result === JOB_NOTFOUND) - P.reject(new Error(`job ${id} not found`)) - } - } -} - -/** - * @param {object} data - * @param {number} requestNo - * @param {Connection} connection - */ -function onPause(data, requestNo, connection) { - let targets - if ((targets = validateInputTargets(data, requestNo, connection)) === false) - return - - worker.pauseTargets(targets) - connection.send( - new ResponseMessage(requestNo) - .setData('ok') - ) -} - -/** - * @param {object} data - * @param {number} requestNo - * @param {Connection} connection - */ -function onContinue(data, requestNo, connection) { - let targets - if ((targets = validateInputTargets(data, requestNo, connection)) === false) - return - - // continue queues - worker.continueTargets(targets) - - // poll just in case - worker.poll() - - // ok - connection.send( - new ResponseMessage(requestNo) - .setData('ok') - ) -} - -/** - * @param {object} data - * @param {number} requestNo - * @param {Connection} connection - */ -function onSetTargetConcurrency(data, requestNo, connection) { - try { - validateObjectSchema(data, [ - // name // type // required - ['concurrency', 'i', true], - ['target', 's', true], - ]) - - if (data.concurrency <= 0) - throw new Error('Invalid concurrency value.') - } catch (e) { - connection.send( - new ResponseMessage(requestNo) - .setError(e.message) - ) - return - } - - worker.setTargetConcurrency(data.target, data.concurrency) - connection.send( - new ResponseMessage(requestNo) - .setData('ok') - ) -} - -/** - * @private - * @param data - * @param requestNo - * @param connection - * @return {null|boolean|string[]} - */ -function validateInputTargets(data, requestNo, connection) { - // null means all targets - let targets = null - - if (data.targets !== undefined) { - targets = data.targets - - // validate data - try { - validateTargetsListFormat(targets) - - for (const t of targets) { - if (!worker.hasTarget(t)) - throw new Error(`invalid target '${t}'`) - } - } catch (e) { - connection.send( - new ResponseMessage(requestNo) - .setError(e.message) - ) - return false - } - } - - return targets -} - function connectToMaster() { const port = config.get('master_port') const host = config.get('master_host') @@ -455,3 +230,167 @@ async function term() { await loggerModule.shutdown() process.exit() } + + + +/****************************************/ +/** **/ +/** Request handlers **/ +/** **/ +/****************************************/ + +/** + * @param {object} data + * @return {Promise} + */ +async function onPollRequest(data) { + let targets = validateInputTargets(data, worker) + + worker.setPollTargets(targets) + worker.poll() + + return 'ok' +} + +/** + * @param {object} data + * @return {Promise} + */ +async function onStatus(data) { + return { + targets: worker.getStatus(), + jobPromisesCount: Object.keys(jobPromises).length, + memoryUsage: process.memoryUsage() + } +} + +/** + * @param {{ids: number[]}} data + * @return {Promise} + */ +async function onRunManual(data) { + let {ids: jobIds} = data + jobIds = uniq(jobIds) + + for (const id of jobIds) { + // if at least one item is not a number, reject + if (typeof id !== 'number') + throw new Error(`all ids must be numbers, got ${typeof id}`) + + // if at least one of the jobs is already being run, reject + if (id in jobPromises) + throw new Error(`another client is already waiting for job ${id}`) + } + + // create a bunch of promises, one per job + let promises = [] + for (const id of jobIds) { + const P = createCallablePromise() + jobPromises[id] = P + promises.push(P) + } + + // get jobs from database and enqueue for execution + const {results} = await worker.getTasks(null, STATUS_MANUAL, {ids: jobIds}) + + // wait till all jobs are done (or failed), then send a response + const P = Promise.allSettled(promises).then(results => { + const response = {} + + for (let i = 0; i < results.length; i++) { + let jobId = jobIds[i] + let result = results[i] + + if (result.status === 'fulfilled') { + if (!('jobs' in response)) + response.jobs = {} + + if (result.value?.id !== undefined) + delete result.value.id + + response.jobs[jobId] = result.value + } else if (result.status === 'rejected') { + if (!('errors' in response)) + response.errors = {} + + response.errors[jobId] = result.reason?.message + } + } + + return response + }) + + // reject all ignored / non-found jobs + for (const [id, value] of results.entries()) { + if (!(id in jobPromises)) { + this.logger.error(`run-manual: ${id} not found in jobPromises`) + continue + } + + if (value.result === JOB_IGNORED || value.result === JOB_NOTFOUND) { + const P = jobPromises[id] + delete jobPromises[id] + + if (value.result === JOB_IGNORED) + P.reject(new Error(value.reason)) + + else if (value.result === JOB_NOTFOUND) + P.reject(new Error(`job ${id} not found`)) + } + } + + return P +} + +/** + * @param {{targets: string[]}} data + */ +async function onPause(data) { + let targets = validateInputTargets(data, worker) + worker.pauseTargets(targets) + return 'ok' +} + +/** + * @param {{targets: string[]}} data + */ +async function onContinue(data) { + let targets + if ((targets = validateInputTargets(data, worker)) === false) + return + + // continue queues + worker.continueTargets(targets) + + // poll just in case + worker.poll() + + return 'ok' +} + +/** + * @param {{target: string, concurrency: int}} data + */ +async function onAddTarget(data) { + validateInputTargetAndConcurrency(data) + worker.addTarget(data.target, data.concurrency) + return 'ok' +} + +/** + * @param {{target: string}} data + */ +async function onRemoveTarget(data) { + validateInputTargetAndConcurrency(data, true) + worker.removeTarget(data.target) + return 'ok' +} + +/** + * @param {object} data + */ +async function onSetTargetConcurrency(data) { + validateInputTargetAndConcurrency(data) + worker.setTargetConcurrency(data.target, data.concurrency) + return 'ok' +} \ No newline at end of file diff --git a/src/lib/data-validator.js b/src/lib/data-validator.js index 7419b34..74827c1 100644 --- a/src/lib/data-validator.js +++ b/src/lib/data-validator.js @@ -11,6 +11,11 @@ const typeNames = { const logger = getLogger('data-validator') + +/**************************************/ +/** Common Functions **/ +/**************************************/ + /** * @param {string} expectedType * @param value @@ -69,7 +74,12 @@ function validateObjectSchema(data, schema) { } } -function validateTargetsListFormat(targets) { + +/********************************************/ +/** Request input data validators */ +/********************************************/ + +function validateInputTargetsListFormat(targets) { if (!Array.isArray(targets)) throw new Error('targets must be array') @@ -83,7 +93,51 @@ function validateTargetsListFormat(targets) { } } +function validateInputTargetAndConcurrency(data, onlyTarget = false) { + const schema = [ + ['target', 's', true], + ] + + if (!onlyTarget) { + schema.push( + ['concurrency', 'i', true] + ) + } + + validateObjectSchema(data, schema) + + if (!onlyTarget && data.concurrency <= 0) + throw new Error('Invalid concurrency value.') +} + +/** + * @param data + * @param {Worker|null} worker + * @return {null|string[]} + */ +function validateInputTargets(data, worker) { + // null means all targets + let targets = null + + if (data.targets !== undefined) { + targets = data.targets + + validateInputTargetsListFormat(targets) + + if (worker !== null) { + for (const t of targets) { + if (!worker.hasTarget(t)) + throw new Error(`invalid target '${t}'`) + } + } + } + + return targets +} + module.exports = { validateObjectSchema, - validateTargetsListFormat + validateInputTargetsListFormat, + validateInputTargetAndConcurrency, + validateInputTargets, } \ No newline at end of file diff --git a/src/lib/logger.js b/src/lib/logger.js index f886e0c..b71020c 100644 --- a/src/lib/logger.js +++ b/src/lib/logger.js @@ -101,10 +101,9 @@ module.exports = { }, /** - * @param cb * @return {Promise} */ - shutdown(cb) { + shutdown() { return new Promise((resolve, reject) => { log4js.shutdown(resolve) }) diff --git a/src/lib/request-handler.js b/src/lib/request-handler.js index 4330b6b..e7f9fe2 100644 --- a/src/lib/request-handler.js +++ b/src/lib/request-handler.js @@ -35,14 +35,17 @@ class RequestHandler { if (this.handlers.has(message.requestType)) { const f = this.handlers.get(message.requestType) - const result = f(message.requestData || {}, message.requestNo, connection) + const result = f(message.requestData || {}, connection) if (result instanceof Promise) { - result.catch(error => { - this.logger.error(`${message.requestType}:`, error) - + result.then(data => { connection.send( new ResponseMessage(message.requestNo) - .setError('server error: ' + error?.message) + .setData(data) + ) + }).catch(error => { + connection.send( + new ResponseMessage(message.requestNo) + .setError(error?.message) ) }) } @@ -56,4 +59,6 @@ class RequestHandler { } -module.exports = RequestHandler \ No newline at end of file +module.exports = { + RequestHandler +} \ No newline at end of file diff --git a/src/lib/worker.js b/src/lib/worker.js index e53d03f..0be6a19 100644 --- a/src/lib/worker.js +++ b/src/lib/worker.js @@ -72,6 +72,25 @@ class Worker extends EventEmitter { } } + /** + * Deletes a queue. + * + * @param {string} target + */ + removeTarget(target) { + if (!(target in this.targets)) + throw new Error(`target '${target}' not found`) + + const {queue} = this.targets[target] + if (queue.length > 0) + throw new Error(`queue is not empty`) + + this.logger.debug(`deleteTarget: deleting target' ${target}'`) + queue.removeAllListeners() + queue.end() + delete this.targets[target] + } + /** * @param {string} target * @param {number} concurrency