drop slots, leave only targets. change config format.

This commit is contained in:
Evgeny Zinoviev 2021-03-05 02:49:32 +03:00
parent 82ae43f5de
commit 61ceac1e56
5 changed files with 90 additions and 126 deletions

View File

@ -25,7 +25,6 @@ In a real world, you would to add need additional fields such as `job_name` or
CREATE TABLE `jobs` (
`id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT,
`target` char(16) NOT NULL,
`slot` char(16) DEFAULT NULL,
`time_created` int(10) UNSIGNED NOT NULL,
`time_started` int(10) UNSIGNED NOT NULL DEFAULT 0,
`time_finished` int(10) UNSIGNED NOT NULL DEFAULT 0,
@ -40,7 +39,8 @@ CREATE TABLE `jobs` (
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
```
For optimization purposes, you can turn fields `target` and `slot` into `ENUM`s.
For optimization purposes, you can turn `target` into `ENUM`. Also if 16 characters
for target is not enough for you, change it to fit your needs.
## Clients

View File

@ -25,14 +25,8 @@ mysql_fetch_limit = 10
launcher = php /Users/ch1p/jobd-launcher.php --id {id}
max_output_buffer = 16777216
;
; targets
;
[server1]
low = 5
normal = 5
high = 5
[global]
normal = 3
[targets]
1/low = 2
1/normal = 5
1/high = 10
global = 3

View File

@ -107,16 +107,13 @@ async function initApp(appName) {
function initWorker() {
worker = new Worker()
for (let targetName in config.get('targets')) {
let slots = config.get('targets')[targetName].slots
// let target = new Target({name: targetName})
// queue.addTarget(target)
for (let slotName in slots) {
let slotLimit = slots[slotName]
worker.addSlot(targetName, slotName, slotLimit)
}
const targets = config.get('targets')
for (const target in targets) {
let limit = targets[target]
worker.addTarget(target, limit)
}
worker.on('job-done', (data) => {
if (jobPromises[data.id] !== undefined) {
const P = jobPromises[data.id]
@ -186,11 +183,10 @@ function onPollRequest(data, requestNo, connection) {
* @param {Connection} connection
*/
function onStatus(data, requestNo, connection) {
const qs = worker.getStatus()
connection.send(
new ResponseMessage(requestNo)
.setData({
targets: qs.targets,
targets: worker.getStatus(),
jobPromisesCount: Object.keys(jobPromises).length,
memoryUsage: process.memoryUsage()
})

View File

@ -34,6 +34,12 @@ function processScheme(source, scheme) {
throw new Error(`'${key}' must be a float`)
value = parseFloat(value)
break
case 'object':
if (typeof value !== 'object')
throw new Error(`'${key}' must be an object`)
break
}
result[key] = value
@ -69,26 +75,23 @@ function parseWorkerConfig(file) {
launcher: {required: true},
max_output_buffer: {default: 1024*1024, type: 'int'},
targets: {required: true, type: 'object'},
}
Object.assign(config, processScheme(raw, scheme))
config.targets = {}
// targets
for (let target in raw) {
for (let target in raw.targets) {
if (target === 'null')
throw new Error('word \'null\' is reserved, please don\'t use it as a target name')
if (typeof raw[target] !== 'object')
continue
if (!isNumeric(raw.targets[target]))
throw new Error(`value of target '${target}' must be a number`)
config.targets[target] = {slots: {}}
for (let slotName in raw[target]) {
let slotLimit = parseInt(raw[target][slotName], 10)
if (slotLimit < 1)
throw new Error(`${target}: slot ${slotName} has invalid limit`)
config.targets[target].slots[slotName] = slotLimit
}
let value = parseInt(raw.targets[target], 10)
if (value < 1)
throw new Error(`target '${target}' has invalid value`)
config.targets[target] = value
}
}

View File

@ -26,7 +26,7 @@ class Worker extends EventEmitter {
super()
/**
* @type {object.<string, {slots: object.<string, Queue>, paused: boolean}>}
* @type {object.<string, {queue: Queue, paused: boolean}>}
*/
this.targets = {}
@ -50,30 +50,26 @@ class Worker extends EventEmitter {
* Creates new queue.
*
* @param {string} target
* @param {string} slot
* @param {number} limit
*/
addSlot(target, slot, limit) {
this.logger.debug(`addSlot: adding slot '${slot}' for target' ${target}' (limit: ${limit})`)
addTarget(target, limit) {
this.logger.debug(`addTarget: adding target' ${target}', limit = ${limit}`)
if (this.targets[target] === undefined)
this.targets[target] = {
slots: {},
paused: false
}
if (this.targets[target].slots[slot] !== undefined)
throw new Error(`slot ${slot} for target ${target} has already been added`)
if (target in this.targets)
throw new Error(`target '${target}' already added`)
let queue = Queue({
concurrency: limit,
autostart: true
})
queue.on('success', this.onJobFinished.bind(this, target, slot))
queue.on('error', this.onJobFinished.bind(this, target, slot))
queue.on('success', this.onJobFinished.bind(this, target))
queue.on('error', this.onJobFinished.bind(this, target))
queue.start()
this.targets[target].slots[slot] = queue
this.targets[target] = {
paused: false,
queue
}
}
/**
@ -85,19 +81,17 @@ class Worker extends EventEmitter {
if (targets === null)
targets = this.getTargets()
for (const targetName of targets) {
const target = this.targets[targetName]
if (target.paused) {
this.logger.warn(`pauseTargets: ${targetName} is already paused`)
for (const target of targets) {
const {queue, paused} = this.targets[target]
if (paused) {
this.logger.warn(`pauseTargets: ${target} is already paused`)
continue
}
for (const slotName in target.slots) {
this.logger.debug(`pauseTargets: stopping ${targetName}/${slotName} queue`)
target.slots[slotName].stop()
}
this.logger.debug(`pauseTargets: stopping ${target}`)
queue.stop()
target.paused = true
this.targets[target].paused = true
}
}
@ -110,19 +104,17 @@ class Worker extends EventEmitter {
if (targets === null)
targets = this.getTargets()
for (const targetName of targets) {
const target = this.targets[targetName]
if (!target.paused) {
this.logger.warn(`continueTargets: ${targetName} is not paused`)
for (const target of targets) {
const {queue, paused} = this.targets[target]
if (!paused) {
this.logger.warn(`continueTargets: ${target} is not paused`)
continue
}
for (const slotName in target.slots) {
this.logger.debug(`pauseTargets: starting ${targetName}/${slotName} queue`)
target.slots[slotName].start()
}
this.logger.debug(`pauseTargets: starting ${target}`)
queue.start()
target.paused = false
this.targets[target].paused = false
}
}
@ -142,21 +134,18 @@ class Worker extends EventEmitter {
* @return {object}
*/
getStatus() {
let status = {targets: {}}
for (const targetName in this.targets) {
let target = this.targets[targetName]
status.targets[targetName] = {
paused: target.paused,
slots: {}
}
for (const slotName in target.slots) {
const queue = target.slots[slotName]
status.targets[targetName].slots[slotName] = {
let status = {}
for (const target in this.targets) {
if (!this.targets.hasOwnProperty(target))
continue
const {queue, paused} = this.targets[target]
status[target] = {
paused,
concurrency: queue.concurrency,
length: queue.length,
}
}
}
return status
}
@ -188,10 +177,10 @@ class Worker extends EventEmitter {
return
}
// skip and postpone the poll, if no free slots
// skip and postpone the poll, if no free targets
// it will be called again from onJobFinished()
if (!this.hasFreeSlots(targets)) {
this.logger.debug(`${LOGPREFIX} no free slots`)
if (!this.hasFreeTargets(targets)) {
this.logger.debug(`${LOGPREFIX} no free targets`)
return
}
@ -279,7 +268,7 @@ class Worker extends EventEmitter {
* @param {{ids: number[]}} data
* @returns
* {Promise<{
* results: Map<number, {status: number, reason: string, slot: string, target: string}>,
* results: Map<number, {status: number, reason: string, target: string}>,
* rowsCount: number
* }>}
*/
@ -290,11 +279,11 @@ class Worker extends EventEmitter {
await db.beginTransaction()
/**
* @type {Map<number, {status: number, reason: string, slot: string, target: string}>}
* @type {Map<number, {status: number, reason: string, target: string}>}
*/
const jobsResults = new Map()
let sqlFields = `id, status, target, slot`
let sqlFields = `id, status, target`
let sql
if (data.ids) {
sql = `SELECT ${sqlFields} FROM ${config.get('mysql_table')} WHERE id IN(`+data.ids.map(db.escape).join(',')+`) FOR UPDATE`
@ -318,7 +307,6 @@ class Worker extends EventEmitter {
for (let result of rows) {
const id = parseInt(result.id)
const slot = String(result.slot)
const target = String(result.target)
const status = String(result.status)
@ -333,7 +321,7 @@ class Worker extends EventEmitter {
continue
}
if (!target || this.targets[target] === undefined) {
if (!target || !(target in this.targets)) {
let reason = `target '${target}' not found (job id=${id})`
jobsResults.set(id, {
result: JOB_IGNORED,
@ -344,23 +332,11 @@ class Worker extends EventEmitter {
continue
}
if (!slot || this.targets[target].slots[slot] === undefined) {
let reason = `slot '${slot}' of target '${target}' not found (job id=${id})`
jobsResults.set(id, {
result: JOB_IGNORED,
reason
})
this.logger.error(`${LOGPREFIX} ${reason}`)
continue
}
this.logger.debug(`${LOGPREFIX} accepted target='${target}', slot='${slot}', id=${id}`)
this.logger.debug(`${LOGPREFIX} accepted target='${target}', id=${id}`)
jobsResults.set(id, {
result: JOB_ACCEPTED,
target,
slot
target
})
}
@ -400,8 +376,8 @@ class Worker extends EventEmitter {
if (result !== JOB_ACCEPTED)
continue
const {slot, target} = jobResult
this.enqueueJob(id, target, slot)
const {target} = jobResult
this.enqueueJob(id, target)
}
return {
@ -415,10 +391,9 @@ class Worker extends EventEmitter {
*
* @param {int} id
* @param {string} target
* @param {string} slot
*/
enqueueJob(id, target, slot) {
const queue = this.targets[target].slots[slot]
enqueueJob(id, target) {
const queue = this.targets[target].queue
queue.push(async (cb) => {
let data = {
code: null,
@ -556,37 +531,33 @@ class Worker extends EventEmitter {
* @param {string[]} inTargets
* @returns {boolean}
*/
hasFreeSlots(inTargets = []) {
const LOGPREFIX = `hasFreeSlots(${JSON.stringify(inTargets)}):`
hasFreeTargets(inTargets = []) {
const LOGPREFIX = `hasFreeTargets(${JSON.stringify(inTargets)}):`
this.logger.debug(`${LOGPREFIX} entered`)
for (const target in this.targets) {
if (!inTargets.includes(target))
if (!this.targets.hasOwnProperty(target) || !inTargets.includes(target))
continue
for (const slot in this.targets[target].slots) {
const queue = this.targets[target].slots[slot]
this.logger.debug(LOGPREFIX, queue.concurrency, queue.length)
const {paused, queue} = this.targets[target]
this.logger.trace(LOGPREFIX, target, queue.concurrency, queue.length)
if (queue.length < queue.concurrency)
return true
}
}
return false
}
/**
* @param {string} target
* @param {string} slot
*/
onJobFinished = (target, slot) => {
this.logger.debug(`onJobFinished: target=${target}, slot=${slot}`)
onJobFinished = (target) => {
this.logger.debug(`onJobFinished: target=${target}`)
const targetPaused = this.targets[target].paused
const queue = this.targets[target].slots[slot]
if (!targetPaused && queue.length < queue.concurrency && this.hasPollTarget(target)) {
const {paused, queue} = this.targets[target]
if (!paused && queue.length < queue.concurrency && this.hasPollTarget(target)) {
this.logger.debug(`onJobFinished: ${queue.length} < ${queue.concurrency}, calling poll(${target})`)
this.poll()
}