initial commit

This commit is contained in:
Evgeny Zinoviev 2021-02-24 03:59:25 +03:00
commit 5e7d34458a
14 changed files with 1868 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
.idea
node_modules

44
README.md Normal file
View File

@ -0,0 +1,44 @@
# jobd
**jobd** is a simple job queue daemon written in Node.JS. It uses MySQL
table as a storage.
## Installation
To be written
## Usage
To be written
## MySQL setup
Table scheme. You can add additional fields if you need.
```
CREATE TABLE `jobs` (
`id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT,
`target` char(16) NOT NULL,
`slot` char(16) DEFAULT NULL,
`time_created` int(10) UNSIGNED NOT NULL,
`time_started` int(10) UNSIGNED NOT NULL DEFAULT 0,
`time_finished` int(10) UNSIGNED NOT NULL DEFAULT 0,
`status` enum('waiting','manual','accepted','running','done','ignored') NOT NULL DEFAULT 'waiting',
`result` enum('ok','fail') DEFAULT NULL,
`return_code` tinyint(3) UNSIGNED DEFAULT NULL,
`sig` char(10) DEFAULT NULL,
`stdout` mediumtext DEFAULT NULL,
`stderr` mediumtext DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `status_target_idx` (`status`, `target`, `id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
```
You can turn `target` and `slot` to `ENUM`, for optimization.
## License
BSD-2c

12
jobd-master.conf.example Normal file
View File

@ -0,0 +1,12 @@
; server settings
host = 0.0.0.0
port = 13597
;password =
ping_interval = 30 ; seconds
poke_throttle_interval = 0.5 ; seconds
; logging
log_file = /tmp/jobd-master.log
log_level_file = info
log_level_console = debug

38
jobd.conf.example Normal file
View File

@ -0,0 +1,38 @@
; server settings
host = 0.0.0.0
port = 13596
;password =
master_host = 127.0.0.1
master_port = 13597
master_reconnect_timeout = 10
; log
log_file = /tmp/jobd.log
log_level_file = info
log_level_console = debug
; mysql settings
mysql_host = 10.211.55.6
mysql_port = 3306
mysql_user = jobd
mysql_password = password
mysql_database = jobd
mysql_table = jobs
mysql_fetch_limit = 10
; launcher command template
launcher = php /Users/ch1p/jobd-launcher.php --id {id}
max_output_buffer = 16777216
;
; targets
;
[server1]
low = 5
normal = 5
high = 5
[global]
normal = 3

34
package.json Normal file
View File

@ -0,0 +1,34 @@
{
"name": "jobd",
"version": "1.0.0",
"description": "job queue daemon",
"main": "src/jobd",
"homepage": "https://github.com/gch1p/jobd#readme",
"bugs": {
"url" : "https://github.com/gch1p/jobd/issues",
"email": "me@ch1p.io"
},
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"bin": {
"jobd": "./src/jobd.js",
"jobd-master": "./src/jobd-master.js"
},
"keywords": [],
"author": "Evgeny Zinoviev",
"license": "BSD-2-Clause",
"os": [
"darwin",
"linux"
],
"dependencies": {
"ini": "^2.0.0",
"lodash": "^4.17.21",
"log4js": "^6.3.0",
"minimist": "^1.2.5",
"mysql": "^2.18.1",
"promise-mysql": "^5.0.2",
"queue": "^6.0.2"
}
}

119
src/config.js Normal file
View File

@ -0,0 +1,119 @@
const fs = require('fs')
const ini = require('ini')
const {isNumeric} = require('./util')
let workerConfig = {
targets: {},
}
let masterConfig = {}
function readFile(file) {
if (!fs.existsSync(file))
throw new Error(`file ${file} not found`)
return ini.parse(fs.readFileSync(file, 'utf-8'))
}
function processScheme(source, scheme) {
const result = {}
for (let key in scheme) {
let opts = scheme[key]
let ne = !(key in source) || !source[key]
if (opts.required === true && ne)
throw new Error(`'${key}' is not defined`)
let value = source[key] ?? opts.default ?? null
switch (opts.type) {
case 'int':
if (!isNumeric(value))
throw new Error(`'${key}' must be an integer`)
value = parseInt(value, 10)
break
case 'float':
if (!isNumeric(value))
throw new Error(`'${key}' must be a float`)
value = parseFloat(value)
break
}
result[key] = value
}
return result
}
function parseWorkerConfig(file) {
const raw = readFile(file)
const scheme = {
host: {required: true},
port: {required: true, type: 'int'},
password: {},
master_host: {},
master_port: {type: 'int', default: 0},
master_reconnect_timeout: {type: 'int', default: 10},
log_file: {},
log_level_file: {default: 'warn'},
log_level_console: {default: 'warn'},
mysql_host: {required: true},
mysql_port: {required: true, type: 'int'},
mysql_user: {required: true},
mysql_password: {required: true},
mysql_database: {required: true},
mysql_table: {required: true, default: 'jobs'},
mysql_fetch_limit: {default: 100, type: 'int'},
launcher: {required: true},
max_output_buffer: {default: 1024*1024, type: 'int'},
}
Object.assign(workerConfig, processScheme(raw, scheme))
// targets
for (let target in raw) {
if (target === 'null')
throw new Error('word \'null\' is reserved, please don\'t use it as a target name')
if (typeof raw[target] !== 'object')
continue
workerConfig.targets[target] = {slots: {}}
for (let slotName in raw[target]) {
let slotLimit = parseInt(raw[target][slotName], 10)
if (slotLimit < 1)
throw new Error(`${target}: slot ${slotName} has invalid limit`)
workerConfig.targets[target].slots[slotName] = slotLimit
}
}
}
function parseMasterConfig(file) {
const raw = readFile(file)
const scheme = {
host: {required: true},
port: {required: true, type: 'int'},
password: {},
ping_interval: {default: 30, type: 'int'},
poke_throttle_interval: {default: 0.5, type: 'float'},
log_file: {},
log_level_file: {default: 'warn'},
log_level_console: {default: 'warn'},
}
Object.assign(masterConfig, processScheme(raw, scheme))
}
module.exports = {
parseWorkerConfig,
parseMasterConfig,
workerConfig,
masterConfig
}

49
src/db.js Normal file
View File

@ -0,0 +1,49 @@
const {workerConfig} = require('./config')
const {getLogger} = require('./logger')
const mysql = require('promise-mysql')
let link
const logger = getLogger('db')
async function init() {
link = await mysql.createConnection({
host: workerConfig.mysql_host,
user: workerConfig.mysql_user,
password: workerConfig.mysql_password,
database: workerConfig.mysql_database
})
}
function wrap(method, isAsync = true, log = true) {
return isAsync ? async function(...args) {
if (log)
logger.trace(`${method}: `, args)
try {
return await link[method](...args)
} catch (error) {
logger.error(`db.${method}:`, error, link)
if ( error.code === 'PROTOCOL_ENQUEUE_AFTER_FATAL_ERROR'
|| error.code === 'PROTOCOL_CONNECTION_LOST'
|| error.fatal === true) {
// try to reconnect and call it again, once
await init()
return await link[method](...args)
}
}
} : function(...args) {
if (log)
logger.trace(`${method}: `, args)
return link[method](...args)
}
}
module.exports = {
init,
query: wrap('query'),
beginTransaction: wrap('beginTransaction'),
commit: wrap('commit'),
escape: wrap('escape', false, false)
}

153
src/jobd-master.js Executable file
View File

@ -0,0 +1,153 @@
#!/usr/bin/env node
const minimist = require('minimist')
const loggerModule = require('./logger')
const configModule = require('./config')
const {Server, ResponseMessage, RequestMessage} = require('./server')
const WorkersList = require('./workers-list')
const {masterConfig} = configModule
/**
* @type {Logger}
*/
let logger
/**
* @type {Server}
*/
let server
/**
* @type WorkersList
*/
let workers
main().catch(e => {
console.error(e)
process.exit(1)
})
async function main() {
if (process.argv.length < 3) {
usage()
process.exit(0)
}
process.on('SIGINT', term)
process.on('SIGTERM', term)
const argv = minimist(process.argv.slice(2))
if (!argv.config)
throw new Error('--config option is required')
// read config
try {
configModule.parseMasterConfig(argv.config)
} catch (e) {
console.error(`config parsing error: ${e.message}`)
process.exit(1)
}
await loggerModule.init({
file: masterConfig.log_file,
levelFile: masterConfig.log_level_file,
levelConsole: masterConfig.log_level_console,
})
logger = loggerModule.getLogger('jobd-master')
// console.log(masterConfig)
workers = new WorkersList()
// start server
server = new Server()
server.on('message', onMessage)
server.start(masterConfig.port, masterConfig.host)
logger.info('server started')
}
/**
* @param {RequestMessage|ResponseMessage} message
* @param {Connection} connection
* @return {Promise<*>}
*/
async function onMessage({message, connection}) {
try {
if (!(message instanceof RequestMessage)) {
logger.debug('ignoring message', message)
return
}
if (message.requestType !== 'ping')
logger.info('onMessage:', message)
if (masterConfig.password && message.password !== masterConfig.password) {
connection.send(new ResponseMessage().setError('invalid password'))
return connection.close()
}
switch (message.requestType) {
case 'ping':
connection.send(new ResponseMessage().setError('pong'))
break
case 'register-worker': {
const targets = message.requestData?.targets || []
if (!targets.length) {
connection.send(new ResponseMessage().setError(`targets are empty`))
break
}
workers.add(connection, targets)
connection.send(new ResponseMessage().setData('ok'))
break
}
case 'poke': {
const targets = message.requestData?.targets || []
if (!targets.length) {
connection.send(new ResponseMessage().setError(`targets are empty`))
break
}
workers.poke(targets)
connection.send(new ResponseMessage().setData('ok'))
break
}
case 'status':
const info = workers.getInfo()
connection.send(new ResponseMessage().setData({
workers: info,
memoryUsage: process.memoryUsage()
}))
break
default:
connection.send(new ResponseMessage().setError(`unknown request type: '${message.requestType}'`))
break
}
} catch (error) {
logger.error(`error while handling message:`, message, error)
connection.send(new ResponseMessage().setError('server error: ' + error?.message))
}
}
function usage() {
let s = `${process.argv[1]} OPTIONS
Options:
--config <path>`
console.log(s)
}
function term() {
if (logger)
logger.info('shutdown')
loggerModule.shutdown(function() {
process.exit()
})
}

244
src/jobd.js Executable file
View File

@ -0,0 +1,244 @@
#!/usr/bin/env node
const minimist = require('minimist')
const loggerModule = require('./logger')
const configModule = require('./config')
const db = require('./db')
const {Server, Connection, RequestMessage, ResponseMessage} = require('./server')
const {Worker, STATUS_MANUAL} = require('./worker')
const {workerConfig} = configModule
/**
* @type {Worker}
*/
let worker
/**
* @type {Logger}
*/
let logger
/**
* @type {Server}
*/
let server
/**
* @type {object.<string, Connection>}
*/
let jobDoneAwaiters = {}
main().catch(e => {
console.error(e)
process.exit(1)
})
async function main() {
if (process.argv.length < 3) {
usage()
process.exit(0)
}
process.on('SIGINT', term)
process.on('SIGTERM', term)
const argv = minimist(process.argv.slice(2))
if (!argv.config)
throw new Error('--config option is required')
// read config
try {
configModule.parseWorkerConfig(argv.config)
} catch (e) {
console.error(`config parsing error: ${e.message}`)
process.exit(1)
}
await loggerModule.init({
file: workerConfig.log_file,
levelFile: workerConfig.log_level_file,
levelConsole: workerConfig.log_level_console,
})
logger = loggerModule.getLogger('jobd')
// console.log(workerConfig)
// init database
try {
await db.init()
} catch (error) {
logger.error('failed to connect to MySQL', error)
process.exit(1)
}
logger.info('db initialized')
// init queue
worker = new Worker()
for (let targetName in workerConfig.targets) {
let slots = workerConfig.targets[targetName].slots
// let target = new Target({name: targetName})
// queue.addTarget(target)
for (let slotName in slots) {
let slotLimit = slots[slotName]
worker.addSlot(targetName, slotName, slotLimit)
}
}
worker.on('job-done', (data) => {
if (jobDoneAwaiters[data.id] !== undefined) {
jobDoneAwaiters[data.id].send(new ResponseMessage().setData(data))
jobDoneAwaiters[data.id].close()
delete jobDoneAwaiters[data.id]
}
})
logger.info('queue initialized')
// start server
server = new Server()
server.on('message', onMessage)
server.start(workerConfig.port, workerConfig.host)
logger.info('server started')
// connect to master
if (workerConfig.master_port && workerConfig.master_host)
connectToMaster()
}
/**
* @param {RequestMessage|ResponseMessage} message
* @param {Connection} connection
* @return {Promise<*>}
*/
async function onMessage({message, connection}) {
try {
if (!(message instanceof RequestMessage)) {
logger.debug('ignoring message', message)
return
}
if (message.requestType !== 'ping')
logger.info('onMessage:', message)
if (workerConfig.password && message.password !== workerConfig.password) {
connection.send(new ResponseMessage().setError('invalid password'))
return connection.close()
}
switch (message.requestType) {
case 'ping':
connection.send(new ResponseMessage().setData('pong'))
break
case 'poll':
const targets = message.requestData?.targets || []
if (!targets.length) {
connection.send(new ResponseMessage().setError('empty targets'))
break
}
for (const t of targets) {
if (!worker.hasTarget(t)) {
connection.send(new ResponseMessage().setError(`invalid target '${t}'`))
break
}
}
worker.setPollTargets(targets)
worker.poll()
connection.send(new ResponseMessage().setData('ok'));
break
case 'status':
const qs = worker.getStatus()
connection.send(
new ResponseMessage().setData({
queue: qs,
jobDoneAwaitersCount: Object.keys(jobDoneAwaiters).length,
memoryUsage: process.memoryUsage()
})
)
break
case 'run-manual':
const {id} = message.requestData
if (id in jobDoneAwaiters) {
connection.send(new ResponseMessage().setError('another client is already waiting this job'))
break
}
jobDoneAwaiters[id] = connection
const {accepted} = await worker.getTasks(null, STATUS_MANUAL, {id})
if (!accepted) {
delete jobDoneAwaiters[id]
connection.send(new ResponseMessage().setError('failed to run task')) // would be nice to provide some error...
}
break
default:
connection.send(new ResponseMessage().setError(`unknown request type: '${message.requestType}'`))
break
}
} catch (error) {
logger.error(`error while handling message:`, message, error)
connection.send(new ResponseMessage().setError('server error: ' + error?.message))
}
}
function connectToMaster() {
const connection = new Connection()
connection.connect(workerConfig.master_host, workerConfig.master_port)
connection.on('connect', function() {
connection.send(
new RequestMessage('register-worker', {
targets: worker.getTargets()
})
)
})
connection.on('close', () => {
logger.warn(`connectToMaster: connection closed`)
setTimeout(() => {
connectToMaster()
}, workerConfig.master_reconnect_timeout * 1000)
})
connection.on('message', (message) => {
if (!(message instanceof RequestMessage)) {
logger.debug('message from master is not a request, hmm... skipping', message)
return
}
onMessage({message, connection})
.catch((error) => {
logger.error('connectToMaster: onMessage:', error)
})
})
}
function usage() {
let s = `${process.argv[1]} OPTIONS
Options:
--config <path>`
console.log(s)
}
function term() {
if (logger)
logger.info('shutdown')
loggerModule.shutdown(function() {
process.exit()
})
}

97
src/logger.js Normal file
View File

@ -0,0 +1,97 @@
const log4js = require('log4js')
const fs = require('fs/promises')
const fsConstants = require('fs').constants
const util = require('util')
module.exports = {
/**
* @param {string} file
* @param {string} levelFile
* @param {string} levelConsole
*/
async init({file, levelFile, levelConsole}) {
const categories = {
default: {
appenders: ['stdout-filter'],
level: 'trace'
}
}
const appenders = {
stdout: {
type: 'stdout',
level: 'trace'
},
'stdout-filter': {
type: 'logLevelFilter',
appender: 'stdout',
level: levelConsole
}
}
if (file) {
let exists
try {
await fs.stat(file)
exists = true
} catch (error) {
exists = false
}
// if file exists
if (exists) {
// see if it's writable
try {
// this promise fullfills with undefined upon success
await fs.access(file, fsConstants.W_OK)
} catch (error) {
throw new Error(`file '${file}' is not writable:` + error.message)
}
} else {
// try to create an empty file
let fd
try {
fd = await fs.open(file, 'wx')
} catch (error) {
throw new Error(`failed to create file '${file}': ` + error.message)
} finally {
await fd?.close()
}
}
categories.default.appenders.push('file-filter')
appenders.file = {
type: 'file',
filename: file,
maxLogSize: 1024 * 1024 * 50,
debug: 'debug'
}
appenders['file-filter'] = {
type: 'logLevelFilter',
appender: 'file',
level: levelFile
}
}
log4js.configure({
appenders,
categories
})
},
/**
* @return {Logger}
*/
getLogger(...args) {
return log4js.getLogger(...args)
},
/**
* @param cb
*/
shutdown(cb) {
log4js.shutdown(cb)
},
Logger: log4js.Logger,
}

450
src/server.js Normal file
View File

@ -0,0 +1,450 @@
const net = require('net')
const EventEmitter = require('events')
const {getLogger} = require('./logger')
const isObject = require('lodash/isObject')
const EOT = 0x04
class Message {
static REQUEST = 0
static RESPONSE = 1
/**
* @param {number} type
*/
constructor(type) {
/**
* @type {number}
*/
this.type = type
}
getAsObject() {
return [this.type]
}
}
class ResponseMessage extends Message {
constructor() {
super(Message.RESPONSE)
this.error = null
this.data = null
}
setError(error) {
this.error = error
return this
}
setData(data) {
this.data = data
return this
}
getAsObject() {
return [
...super.getAsObject(),
[
this.error,
this.data
]
]
}
}
class RequestMessage extends Message {
/**
* @param {string} type
* @param {any} data
*/
constructor(type, data = null) {
super(Message.REQUEST)
/**
* @type string
*/
this.requestType = type
/**
* @type any
*/
this.requestData = data
/**
* @type {null|string}
*/
this.password = null
}
getAsObject() {
let request = {
type: this.requestType
}
if (this.requestData)
request.data = this.requestData
return [
...super.getAsObject(),
request
]
}
/**
* @param {string} password
*/
setPassword(password) {
this.password = password
}
}
class Server extends EventEmitter {
constructor() {
super()
/**
* @type {null|module:net.Server}
*/
this.server = null
/**
* @type {Logger}
*/
this.logger = getLogger('server')
}
/**
* @param {number} port
* @param {string} host
*/
start(port, host) {
this.server = net.createServer()
this.server.on('connection', this.onConnection)
this.server.on('error', this.onError)
this.server.on('listening', this.onListening)
this.server.listen(port, host)
}
/**
* @param {module:net.Socket} socket
*/
onConnection = (socket) => {
let connection = new Connection()
connection.setSocket(socket)
connection.on('message', (message) => {
this.emit('message', {
message,
connection
})
})
this.logger.info(`new connection from ${socket.remoteAddress}:${socket.remotePort}`)
}
onListening = () => {
let addr = this.server.address()
this.logger.info(`server is listening on ${addr.address}:${addr.port}`)
}
onError = (error) => {
this.logger.error('error: ', error)
}
}
class Connection extends EventEmitter {
constructor() {
super()
/**
* @type {null|module:net.Socket}
*/
this.socket = null
/**
* @type {Buffer}
*/
this.data = Buffer.from([])
/**
* @type {boolean}
* @private
*/
this._closeEmitted = false
/**
* @type {null|string}
*/
this.remoteAddress = null
/**
* @type {null|number}
*/
this.remotePort = null
/**
* @type {null|number}
*/
this.id = null
this._setLogger()
}
/**
* @param {string} host
* @param {number} port
*/
connect(host, port) {
if (this.socket !== null)
throw new Error(`this Connection already has a socket`)
this.socket = new net.Socket()
this.socket.connect({host, port})
this.remoteAddress = host
this.remotePort = port
this._setId()
this._setLogger()
this._setSocketEvents()
}
/**
* @param {module:net.Socket} socket
*/
setSocket(socket) {
this.socket = socket
this.remoteAddress = socket.remoteAddress
this.remotePort = socket.remotePort
this._setId()
this._setLogger()
this._setSocketEvents()
}
/**
* @private
*/
_setLogger() {
let addr = this.socket ? this.remoteAddr() : '?'
this.logger = getLogger(`<Connection ${this.id} ${addr}>`)
}
/**
* @private
*/
_setId() {
this.id = Math.floor(Math.random() * 10000)
}
/**
* @private
*/
_setSocketEvents() {
this.socket.on('connect', this.onConnect)
this.socket.on('data', this.onData)
this.socket.on('end', this.onEnd)
this.socket.on('close', this.onClose)
this.socket.on('error', this.onError)
}
/**
* @param {Buffer} data
* @private
*/
_appendToBuffer(data) {
this.data = Buffer.concat([this.data, data])
}
/**
* @return {string}
*/
remoteAddr() {
return this.remoteAddress + ':' + this.remotePort
}
/**
* @private
*/
_processChunks() {
if (!this.data.length)
return
this.logger.trace(`processChunks (start):`, this.data)
/**
* @type {Buffer[]}
*/
let messages = []
let offset = 0
let eotPos
do {
eotPos = this.data.indexOf(EOT, offset)
if (eotPos !== -1) {
let message = this.data.slice(offset, eotPos)
messages.push(message)
this.logger.debug(`processChunks: found new message (${offset}, ${eotPos})`)
offset = eotPos + 1
}
} while (eotPos !== -1 && offset < this.data.length-1)
if (offset !== 0) {
this.data = this.data.slice(offset)
this.logger.trace(`processChunks: slicing data from ${offset}`)
}
this.logger.trace(`processChunks (after parsing):`, this.data)
for (let message of messages) {
try {
let buf = message.toString('utf-8')
this.logger.debug(buf)
let json = JSON.parse(buf)
this._emitMessage(json)
} catch (error) {
this.logger.error('failed to parse data as JSON')
this.logger.debug(message)
}
}
}
/**
* @param {object} json
* @private
*/
_emitMessage(json) {
if (!Array.isArray(json)) {
this.logger.error('malformed message, JSON array expected', json)
return
}
let type = json.shift()
let message
switch (type) {
case Message.REQUEST: {
let data = json.shift()
if (!data || !isObject(data)) {
this.logger.error('malformed REQUEST message')
return
}
message = new RequestMessage(data.type, data.data || null)
if (data.password)
message.setPassword(data.password)
break
}
case Message.RESPONSE: {
let data = json.shift()
if (!data || !Array.isArray(data) || data.length < 2) {
this.logger.error('malformed RESPONSE message')
return
}
message = new ResponseMessage()
message.setError(data[0]).setData(data[1])
break
}
default:
this.logger.error(`malformed message, unexpected type ${type}`)
return
}
this.emit('message', message)
}
/**
* @type {Message} data
* @param message
*/
send(message) {
if (!(message instanceof Message))
throw new Error('send expects Message, got', message)
let json = JSON.stringify(message.getAsObject())
let buf = Buffer.concat([
Buffer.from(json),
Buffer.from([EOT])
])
this.logger.debug('send:', json)
this.logger.trace('send:', buf)
try {
this.socket.write(buf)
} catch (error) {
this.logger.error(`processChunks: failed to write response ${JSON.stringify(message)} to a socket`, error)
}
}
/**
*/
close() {
try {
this.socket.end()
this.socket.destroy()
this._emitClose()
} catch (error) {
this.logger.error('close:', error)
}
}
/**
* @private
*/
_emitClose() {
if (this._closeEmitted)
return
this._closeEmitted = true
this.emit('close')
}
onConnect = () => {
this.logger.debug('connection established')
this.emit('connect')
}
onData = (data) => {
this.logger.trace('onData', data)
this._appendToBuffer(data)
this._processChunks()
}
onEnd = (data) => {
if (data)
this._appendToBuffer(data)
this._processChunks()
}
onClose = (hadError) => {
this._emitClose()
this.logger.debug(`socket closed` + (hadError ? ` with error` : ''))
}
onError = (error) => {
this._emitClose()
this.logger.warn(`socket error:`, error)
}
}
module.exports = {
Server,
Connection,
RequestMessage,
ResponseMessage
}

9
src/util.js Normal file
View File

@ -0,0 +1,9 @@
module.exports = {
timestamp() {
return parseInt(+(new Date())/1000)
},
isNumeric(n) {
return !isNaN(parseFloat(n)) && isFinite(n)
}
}

472
src/worker.js Normal file
View File

@ -0,0 +1,472 @@
const Queue = require('queue')
const child_process = require('child_process')
const db = require('./db')
const {timestamp} = require('./util')
const {getLogger} = require('./logger')
const EventEmitter = require('events')
const {workerConfig} = require('./config')
const STATUS_WAITING = 'waiting'
const STATUS_MANUAL = 'manual'
const STATUS_ACCEPTED = 'accepted'
const STATUS_IGNORED = 'ignored'
const STATUS_RUNNING = 'running'
const STATUS_DONE = 'done'
const RESULT_OK = 'ok'
const RESULT_FAIL = 'fail'
class Worker extends EventEmitter {
constructor() {
super()
/**
* @type {object.<string, {slots: object.<string, {limit: number, queue: Queue}>}>}
*/
this.targets = {}
/**
* @type {boolean}
*/
this.polling = false
/**
* @type {boolean}
*/
this.nextpoll = {}
/**
* @type {Logger}
*/
this.logger = getLogger('Worker')
}
/**
* @param {string} target
* @param {string} slot
* @param {number} limit
*/
addSlot(target, slot, limit) {
this.logger.debug(`addSlot: adding slot '${slot}' for target' ${target}' (limit: ${limit})`)
if (this.targets[target] === undefined)
this.targets[target] = {slots: {}}
if (this.targets[target].slots[slot] !== undefined)
throw new Error(`slot ${slot} for target ${target} has already been added`)
let queue = Queue({
concurrency: limit,
autostart: true
})
queue.on('success', this.onJobFinished.bind(this, target, slot))
queue.on('error', this.onJobFinished.bind(this, target, slot))
queue.start()
this.targets[target].slots[slot] = {limit, queue}
}
/**
* @param {string} target
* @returns {boolean}
*/
hasTarget(target) {
return (target in this.targets)
}
/**
* Returns status of all queues.
*
* @return {object}
*/
getStatus() {
let status = {targets: {}}
for (const targetName in this.targets) {
let target = this.targets[targetName]
status.targets[targetName] = {}
for (const slotName in target.slots) {
const {queue, limit} = target.slots[slotName]
status.targets[targetName][slotName] = {
concurrency: queue.concurrency,
limit,
length: queue.length,
}
}
}
return status
}
/**
* @return {string[]}
*/
getTargets() {
return Object.keys(this.targets)
}
/**
*
*/
poll() {
const LOGPREFIX = `poll():`
let targets = this.getPollTargets()
if (!targets.length) {
this.poller.warn(`${LOGPREFIX} no targets`)
return
}
// skip and postpone the poll, if we're in the middle on another poll
// it will be called again from the last .then() at the end of this method
if (this.polling) {
this.logger.debug(`${LOGPREFIX} already polling`)
return
}
// skip and postpone the poll, if no free slots
// it will be called again from onJobFinished()
if (!this.hasFreeSlots(targets)) {
this.logger.debug(`${LOGPREFIX} no free slots`)
return
}
// set polling flag
this.polling = true
// clear postponed polls target list
this.setPollTargets()
this.logger.debug(`${LOGPREFIX} calling getTasks(${JSON.stringify(targets)})`)
this.getTasks(targets)
.then(({rows}) => {
let message = `${LOGPREFIX} ${rows} processed`
if (workerConfig.mysql_fetch_limit && rows >= workerConfig.mysql_fetch_limit) {
// it seems, there are more, so we'll need to perform another query
this.setPollTargets(targets)
message += `, scheduling more polls (targets: ${JSON.stringify(this.getPollTargets())})`
}
this.logger.debug(message)
})
.catch((error) => {
this.logger.error(`${LOGPREFIX}`, error)
//this.setPollTargets(targets)
})
.then(() => {
// unset polling flag
this.polling = false
// perform another poll, if needed
if (this.getPollTargets().length > 0) {
this.logger.debug(`${LOGPREFIX} next poll scheduled, calling poll() again`)
this.poll()
}
})
}
/**
* @param {string|string[]|null} target
*/
setPollTargets(target) {
// when called without parameter, remove all targets
if (target === undefined) {
this.nextpoll = {}
return
}
// just a fix
if (target === 'null')
target = null
if (Array.isArray(target)) {
target.forEach(t => {
this.nextpoll[t] = true
})
} else {
if (target === null)
this.nextpoll = {}
this.nextpoll[target] = true
}
}
/**
* @return {string[]}
*/
getPollTargets() {
if (null in this.nextpoll)
return Object.keys(this.targets)
return Object.keys(this.nextpoll)
}
/**
* @param {string} target
* @return {boolean}
*/
hasPollTarget(target) {
return target in this.nextpoll || null in this.nextpoll
}
/**
* @param {string|null|string[]} target
* @param {string} reqstatus
* @param {object} data
* @returns {Promise<{ignored: number, accepted: number, rows: number}>}
*/
async getTasks(target = null, reqstatus = STATUS_WAITING, data = {}) {
const LOGPREFIX = `getTasks(${JSON.stringify(target)}, '${reqstatus}', ${JSON.stringify(data)}):`
// get new jobs in transaction
await db.beginTransaction()
let sqlFields = `id, status, target, slot`
let sql
if (data.id) {
sql = `SELECT ${sqlFields} FROM ${workerConfig.mysql_table} WHERE id=${db.escape(data.id)} FOR UPDATE`
} else {
let targets
if (target === null) {
targets = Object.keys(this.targets)
} else if (!Array.isArray(target)) {
targets = [target]
} else {
targets = target
}
let sqlLimit = workerConfig.mysql_fetch_limit !== 0 ? ` LIMIT 0, ${workerConfig.mysql_fetch_limit}` : ''
let sqlWhere = `status=${db.escape(reqstatus)} AND target IN (`+targets.map(db.escape).join(',')+`)`
sql = `SELECT ${sqlFields} FROM ${workerConfig.mysql_table} WHERE ${sqlWhere} ORDER BY id ${sqlLimit} FOR UPDATE`
}
/** @type {object[]} results */
let results = await db.query(sql)
this.logger.trace(`${LOGPREFIX} query result:`, results)
/**
* @type {{target: string, slot: string, id: number}[]}
*/
let accepted = []
/**
* @type {number[]}
*/
let ignored = []
for (let result of results) {
let {id, slot, target, status} = result
id = parseInt(id)
if (status !== reqstatus) {
this.logger.warn(`${LOGPREFIX} status = ${status} != ${reqstatus}`)
ignored.push(id)
continue
}
if (!target || this.targets[target] === undefined) {
this.logger.error(`${LOGPREFIX} target '${target}' not found (job id=${id})`)
ignored.push(id)
continue
}
if (!slot || this.targets[target].slots[slot] === undefined) {
this.logger.error(`${LOGPREFIX} slot '${slot}' of target '${target}' not found (job id=${id})`)
ignored.push(id)
continue
}
this.logger.debug(`${LOGPREFIX} accepted target='${target}', slot='${slot}', id=${id}`)
accepted.push({target, slot, id})
}
if (accepted.length)
await db.query(`UPDATE ${workerConfig.mysql_table} SET status='accepted' WHERE id IN (`+accepted.map(j => j.id).join(',')+`)`)
if (ignored.length)
await db.query(`UPDATE ${workerConfig.mysql_table} SET status='ignored' WHERE id IN (`+ignored.join(',')+`)`)
await db.commit()
accepted.forEach(({id, target, slot}) => {
let q = this.targets[target].slots[slot].queue
q.push(async (cb) => {
let data = {
code: null,
signal: null,
stdout: '',
stderr: ''
}
let result = RESULT_OK
try {
await this.setJobStatus(id, STATUS_RUNNING)
Object.assign(data, (await this.run(id)))
if (data.code !== 0)
result = RESULT_FAIL
} catch (error) {
this.logger.error(`${LOGPREFIX} job ${id}: error while run():`, error)
result = RESULT_FAIL
data.stderr = (error instanceof Error) ? (error.message + '\n' + error.stack) : (error + '')
} finally {
this.emit('job-done', {
id,
result,
...data
})
try {
await this.setJobStatus(id, STATUS_DONE, result, data)
} catch (error) {
this.logger.error(`${LOGPREFIX} setJobStatus(${id})`, error)
}
cb()
}
})
})
return {
rows: results.length,
accepted: accepted.length,
ignored: ignored.length,
}
}
/**
* @param {number} id
*/
async run(id) {
let command = workerConfig.launcher.replace(/\{id\}/g, id)
let args = command.split(/ +/)
return new Promise((resolve, reject) => {
this.logger.info(`run(${id}): launching`, args)
let process = child_process.spawn(args[0], args.slice(1), {
maxBuffer: workerConfig.max_output_buffer
})
let stdoutChunks = []
let stderrChunks = []
process.on('exit',
/**
* @param {null|number} code
* @param {null|string} signal
*/
(code, signal) => {
let stdout = stdoutChunks.join('')
let stderr = stderrChunks.join('')
stdoutChunks = undefined
stderrChunks = undefined
resolve({
code,
signal,
stdout,
stderr
})
})
process.on('error', (error) => {
reject(error)
})
process.stdout.on('data', (data) => {
if (data instanceof Buffer)
data = data.toString('utf-8')
stdoutChunks.push(data)
})
process.stderr.on('data', (data) => {
if (data instanceof Buffer)
data = data.toString('utf-8')
stderrChunks.push(data)
})
})
}
/**
* @param {number} id
* @param {string} status
* @param {string} result
* @param {object} data
* @return {Promise<void>}
*/
async setJobStatus(id, status, result = RESULT_OK, data = {}) {
let update = {
status,
result
}
switch (status) {
case STATUS_RUNNING:
case STATUS_DONE:
update[status === STATUS_RUNNING ? 'time_started' : 'time_finished'] = timestamp()
break
}
if (data.code !== undefined)
update.return_code = data.code
if (data.signal !== undefined)
update.sig = data.signal
if (data.stderr !== undefined)
update.stderr = data.stderr
if (data.stdout !== undefined)
update.stdout = data.stdout
let list = []
for (let field in update) {
let val = update[field]
if (val !== null)
val = db.escape(val)
list.push(`${field}=${val}`)
}
await db.query(`UPDATE ${workerConfig.mysql_table} SET ${list.join(', ')} WHERE id=?`, [id])
}
/**
* @param {string[]} inTargets
* @returns {boolean}
*/
hasFreeSlots(inTargets = []) {
const LOGPREFIX = `hasFreeSlots(${JSON.stringify(inTargets)}):`
this.logger.debug(`${LOGPREFIX} entered`)
for (const target in this.targets) {
if (!inTargets.includes(target))
continue
for (const slot in this.targets[target].slots) {
const {limit, queue} = this.targets[target].slots[slot]
this.logger.debug(LOGPREFIX, limit, queue.length)
if (queue.length < limit)
return true
}
}
return false
}
/**
* @param {string} target
* @param {string} slot
*/
onJobFinished = (target, slot) => {
this.logger.debug(`onJobFinished: target=${target}, slot=${slot}`)
const {queue, limit} = this.targets[target].slots[slot]
if (queue.length < limit && this.hasPollTarget(target)) {
this.logger.debug(`onJobFinished: ${queue.length} < ${limit}, calling poll(${target})`)
this.poll()
}
}
}
module.exports = {
Worker,
STATUS_WAITING,
STATUS_MANUAL,
STATUS_ACCEPTED,
STATUS_IGNORED,
STATUS_RUNNING,
STATUS_DONE,
}

145
src/workers-list.js Normal file
View File

@ -0,0 +1,145 @@
const intersection = require('lodash/intersection')
const {masterConfig} = require('./config')
const {getLogger} = require('./logger')
const {RequestMessage} = require('./server')
const throttle = require('lodash/throttle')
class WorkersList {
constructor() {
/**
* @type {{connection: Connection, targets: string[]}[]}
*/
this.workers = []
/**
* @type {object.<string, boolean>}
*/
this.targetsToPoke = {}
/**
* @type {object.<string, boolean>}
*/
this.targetsWaitingToPoke = {}
/**
* @type {NodeJS.Timeout}
*/
this.pingInterval = setInterval(this.sendPings, masterConfig.ping_interval * 1000)
/**
* @type {Logger}
*/
this.logger = getLogger('WorkersList')
}
/**
* @param {Connection} connection
* @param {string[]} targets
*/
add(connection, targets) {
this.logger.info(`add: connection from ${connection.remoteAddr()}, targets ${JSON.stringify(targets)}`)
this.workers.push({connection, targets})
connection.on('close', () => {
this.logger.info(`connection from ${connection.remoteAddr()} closed, removing worker`)
this.workers = this.workers.filter(worker => {
return worker.connection !== connection
})
})
let waiting = Object.keys(this.targetsWaitingToPoke)
if (!waiting.length)
return
let intrs = intersection(waiting, targets)
if (intrs.length) {
this.logger.info('add: found intersection with waiting targets:', intrs, 'going to poke new worker')
this._pokeWorkerConnection(connection, intrs)
for (let target of intrs)
delete this.targetsWaitingToPoke[target]
this.logger.trace(`add: this.targetsWaitingToPoke:`, this.targetsWaitingToPoke)
}
}
/**
* @param {string[]} targets
*/
poke(targets) {
this.logger.debug('poke:', targets)
if (!Array.isArray(targets))
throw new Error('targets must be Array')
for (let t of targets)
this.targetsToPoke[t] = true
this._pokeWorkers()
}
/**
* @private
*/
_pokeWorkers = throttle(() => {
const targets = Object.keys(this.targetsToPoke)
this.targetsToPoke = {}
const found = {}
for (const worker of this.workers) {
const intrs = intersection(worker.targets, targets)
intrs.forEach(t => {
found[t] = true
})
if (intrs.length > 0)
this._pokeWorkerConnection(worker.connection, targets)
}
for (let target of targets) {
if (!(target in found)) {
this.logger.debug(`_pokeWorkers: worker responsible for ${target} not found. we'll remember it`)
this.targetsWaitingToPoke[target] = true
}
this.logger.trace('_pokeWorkers: this.targetsWaitingToPoke:', this.targetsWaitingToPoke)
}
}, masterConfig.poke_throttle_interval * 1000, {leading: true})
/**
* @param {Connection} connection
* @param {string[]} targets
* @private
*/
_pokeWorkerConnection(connection, targets) {
this.logger.debug('_pokeWorkerConnection:', connection.remoteAddr(), targets)
connection.send(
new RequestMessage('poll', {
targets
})
)
}
/**
* @return {{targets: string[], remoteAddr: string, remotePort: number}[]}
*/
getInfo() {
return this.workers.map(worker => {
return {
remoteAddr: worker.connection.socket?.remoteAddress,
remotePort: worker.connection.socket?.remotePort,
targets: worker.targets
}
})
}
/**
* @private
*/
sendPings = () => {
this.workers
.forEach(w => {
this.logger.trace(`sending ping to ${w.connection.remoteAddr()}`)
w.connection.send(new RequestMessage('ping'))
})
}
}
module.exports = WorkersList