improvements, see full commit message
- added new option 'always_allow_localhost' - improve jobctl, allow connecting to password-protected instances without using configuration file
This commit is contained in:
parent
fc004fe70a
commit
f7ca888651
@ -2,6 +2,7 @@
|
|||||||
host = 0.0.0.0
|
host = 0.0.0.0
|
||||||
port = 7081
|
port = 7081
|
||||||
;password =
|
;password =
|
||||||
|
always_allow_localhost = 0
|
||||||
|
|
||||||
ping_interval = 30 ; seconds
|
ping_interval = 30 ; seconds
|
||||||
poke_throttle_interval = 0.5 ; seconds
|
poke_throttle_interval = 0.5 ; seconds
|
||||||
|
@ -2,6 +2,7 @@
|
|||||||
host = 0.0.0.0
|
host = 0.0.0.0
|
||||||
port = 7080
|
port = 7080
|
||||||
;password =
|
;password =
|
||||||
|
always_allow_localhost = 0
|
||||||
|
|
||||||
master_host = 127.0.0.1
|
master_host = 127.0.0.1
|
||||||
master_port = 7081
|
master_port = 7081
|
||||||
|
117
src/jobctl.js
117
src/jobctl.js
@ -9,6 +9,7 @@ const fs = require('fs/promises')
|
|||||||
const {Connection, RequestMessage} = require('./lib/server')
|
const {Connection, RequestMessage} = require('./lib/server')
|
||||||
const {isNumeric} = require('./lib/util')
|
const {isNumeric} = require('./lib/util')
|
||||||
const columnify = require('columnify')
|
const columnify = require('columnify')
|
||||||
|
const readline = require('readline')
|
||||||
|
|
||||||
const DEFAULT_CONFIG_PATH = path.join(os.homedir(), '.jobctl.conf')
|
const DEFAULT_CONFIG_PATH = path.join(os.homedir(), '.jobctl.conf')
|
||||||
|
|
||||||
@ -102,7 +103,7 @@ async function initApp(appName) {
|
|||||||
process.on('SIGTERM', term)
|
process.on('SIGTERM', term)
|
||||||
|
|
||||||
const argv = minimist(process.argv.slice(2), {
|
const argv = minimist(process.argv.slice(2), {
|
||||||
boolean: ['master', 'version', 'help'],
|
boolean: ['master', 'version', 'help', 'password'],
|
||||||
string: ['host', 'port', 'config', 'log-level'],
|
string: ['host', 'port', 'config', 'log-level'],
|
||||||
stopEarly: true,
|
stopEarly: true,
|
||||||
default: {
|
default: {
|
||||||
@ -121,18 +122,32 @@ async function initApp(appName) {
|
|||||||
// read config
|
// read config
|
||||||
if (await exists(argv.config)) {
|
if (await exists(argv.config)) {
|
||||||
try {
|
try {
|
||||||
config.parseJobctlConfig(argv.config, {
|
config.parseJobctlConfig(argv.config)
|
||||||
master: argv.master,
|
|
||||||
log_level: argv['log-level'],
|
|
||||||
host: argv.host,
|
|
||||||
port: parseInt(argv.port, 10),
|
|
||||||
})
|
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
console.error(`config parsing error: ${e.message}`)
|
console.error(`config parsing error: ${e.message}`)
|
||||||
process.exit(1)
|
process.exit(1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (argv.master || config.get('master') === null)
|
||||||
|
config.set('master', argv.master)
|
||||||
|
|
||||||
|
for (let key of ['log-level', 'host', 'port']) {
|
||||||
|
if (key in argv)
|
||||||
|
config.set(key.replace('-', '_'), argv[key])
|
||||||
|
}
|
||||||
|
|
||||||
|
if (config.get('port') === null)
|
||||||
|
config.set('port', config.get('master') ? 7081 : 7080)
|
||||||
|
|
||||||
|
if (config.get('log_level') === null)
|
||||||
|
config.set('log_level', 'warn')
|
||||||
|
|
||||||
|
if (argv.password) {
|
||||||
|
let password = await question('Enter password: ')
|
||||||
|
config.set('password', password)
|
||||||
|
}
|
||||||
|
|
||||||
// init logger
|
// init logger
|
||||||
await loggerModule.init({
|
await loggerModule.init({
|
||||||
levelConsole: config.get('log_level'),
|
levelConsole: config.get('log_level'),
|
||||||
@ -167,9 +182,22 @@ async function initApp(appName) {
|
|||||||
return argv['_'] || []
|
return argv['_'] || []
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {string} name
|
||||||
|
* @param {null|object} data
|
||||||
|
* @return {Promise<object>}
|
||||||
|
*/
|
||||||
|
async function request(name, data = null) {
|
||||||
|
let req = new RequestMessage(name, data)
|
||||||
|
let response = await connection.sendRequest(req)
|
||||||
|
if (response.error)
|
||||||
|
throw new Error(`Worker error: ${response.error}`)
|
||||||
|
return response.data
|
||||||
|
}
|
||||||
|
|
||||||
async function workerListTargets() {
|
async function workerListTargets() {
|
||||||
try {
|
try {
|
||||||
let response = await connection.sendRequest(new RequestMessage('status'))
|
let response = await request('status')
|
||||||
const rows = []
|
const rows = []
|
||||||
const columns = [
|
const columns = [
|
||||||
'target',
|
'target',
|
||||||
@ -177,12 +205,12 @@ async function workerListTargets() {
|
|||||||
'length',
|
'length',
|
||||||
'paused'
|
'paused'
|
||||||
]
|
]
|
||||||
for (const target in response.data.targets) {
|
for (const target in response.targets) {
|
||||||
const row = [
|
const row = [
|
||||||
target,
|
target,
|
||||||
response.data.targets[target].concurrency,
|
response.targets[target].concurrency,
|
||||||
response.data.targets[target].length,
|
response.targets[target].length,
|
||||||
response.data.targets[target].paused ? 'yes' : 'no'
|
response.targets[target].paused ? 'yes' : 'no'
|
||||||
]
|
]
|
||||||
rows.push(row)
|
rows.push(row)
|
||||||
}
|
}
|
||||||
@ -196,12 +224,12 @@ async function workerListTargets() {
|
|||||||
|
|
||||||
async function workerMemoryUsage() {
|
async function workerMemoryUsage() {
|
||||||
try {
|
try {
|
||||||
let response = await connection.sendRequest(new RequestMessage('status'))
|
let response = await request('status')
|
||||||
const columns = ['what', 'value']
|
const columns = ['what', 'value']
|
||||||
const rows = []
|
const rows = []
|
||||||
for (const what in response.data.memoryUsage)
|
for (const what in response.memoryUsage)
|
||||||
rows.push([what, response.data.memoryUsage[what]])
|
rows.push([what, response.memoryUsage[what]])
|
||||||
rows.push(['pendingJobPromises', response.data.jobPromisesCount])
|
rows.push(['pendingJobPromises', response.jobPromisesCount])
|
||||||
table(columns, rows)
|
table(columns, rows)
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error(error.message)
|
logger.error(error.message)
|
||||||
@ -232,16 +260,8 @@ async function workerSetTargetConcurrency(argv) {
|
|||||||
concurrency = parseInt(concurrency, 10)
|
concurrency = parseInt(concurrency, 10)
|
||||||
|
|
||||||
try {
|
try {
|
||||||
let response = await connection.sendRequest(
|
let response = await request('set-target-concurrency', {target, concurrency})
|
||||||
new RequestMessage('set-target-concurrency', {
|
console.log(response)
|
||||||
target, concurrency
|
|
||||||
})
|
|
||||||
)
|
|
||||||
|
|
||||||
if (response.error)
|
|
||||||
throw new Error(`Worker error: ${response.error}`)
|
|
||||||
|
|
||||||
console.log(response.data)
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error(error.message)
|
logger.error(error.message)
|
||||||
logger.trace(error)
|
logger.trace(error)
|
||||||
@ -254,11 +274,11 @@ async function masterPoke(argv) {
|
|||||||
|
|
||||||
async function masterMemoryUsage() {
|
async function masterMemoryUsage() {
|
||||||
try {
|
try {
|
||||||
let response = await connection.sendRequest(new RequestMessage('status'))
|
let response = await request('status')
|
||||||
const columns = ['what', 'value']
|
const columns = ['what', 'value']
|
||||||
const rows = []
|
const rows = []
|
||||||
for (const what in response.data.memoryUsage)
|
for (const what in response.memoryUsage)
|
||||||
rows.push([what, response.data.memoryUsage[what]])
|
rows.push([what, response.memoryUsage[what]])
|
||||||
table(columns, rows)
|
table(columns, rows)
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error(error.message)
|
logger.error(error.message)
|
||||||
@ -268,10 +288,10 @@ async function masterMemoryUsage() {
|
|||||||
|
|
||||||
async function masterListWorkers() {
|
async function masterListWorkers() {
|
||||||
try {
|
try {
|
||||||
let response = await connection.sendRequest(new RequestMessage('status', {poll_workers: true}))
|
let response = await request('status', {poll_workers: true})
|
||||||
const columns = ['worker', 'targets', 'concurrency', 'length', 'paused']
|
const columns = ['worker', 'targets', 'concurrency', 'length', 'paused']
|
||||||
const rows = []
|
const rows = []
|
||||||
for (const worker of response.data.workers) {
|
for (const worker of response.workers) {
|
||||||
let remoteAddr = `${worker.remoteAddr}:${worker.remotePort}`
|
let remoteAddr = `${worker.remoteAddr}:${worker.remotePort}`
|
||||||
let targets = Object.keys(worker.workerStatus.targets)
|
let targets = Object.keys(worker.workerStatus.targets)
|
||||||
let concurrencies = targets.map(t => worker.workerStatus.targets[t].concurrency)
|
let concurrencies = targets.map(t => worker.workerStatus.targets[t].concurrency)
|
||||||
@ -297,14 +317,8 @@ async function sendCommandForTargets(targets, command) {
|
|||||||
throw new Error('No targets specified.')
|
throw new Error('No targets specified.')
|
||||||
|
|
||||||
try {
|
try {
|
||||||
let response = await connection.sendRequest(
|
let response = await request(command, {targets})
|
||||||
new RequestMessage(command, {targets})
|
//console.log(response)
|
||||||
)
|
|
||||||
|
|
||||||
if (response.error)
|
|
||||||
throw new Error(`Worker error: ${response.error}`)
|
|
||||||
|
|
||||||
console.log(response.data)
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error(error.message)
|
logger.error(error.message)
|
||||||
logger.trace(error)
|
logger.trace(error)
|
||||||
@ -346,18 +360,14 @@ Options:
|
|||||||
--port Port. Default: 7080 when --master is not used,
|
--port Port. Default: 7080 when --master is not used,
|
||||||
7081 otherwise.
|
7081 otherwise.
|
||||||
--config <path> Path to config. Default: ~/.jobctl.conf
|
--config <path> Path to config. Default: ~/.jobctl.conf
|
||||||
Required for connecting to password-protected
|
--password Ask for a password before launching a command.
|
||||||
instances.
|
|
||||||
--log-level <level> 'error', 'warn', 'info', 'debug' or 'trace'.
|
--log-level <level> 'error', 'warn', 'info', 'debug' or 'trace'.
|
||||||
Default: warn
|
Default: warn
|
||||||
--help: Show this help.
|
--help: Show this help.
|
||||||
--version: Print version.
|
--version: Print version.
|
||||||
|
|
||||||
Configuration file
|
Configuration file
|
||||||
Config file is required for connecting to password-protected jobd instances.
|
Example of possible ~/.jobctl.conf file:
|
||||||
It can also be used to store hostname, port and log level.
|
|
||||||
|
|
||||||
Here's an example of possible ~/.jobctl.conf file:
|
|
||||||
|
|
||||||
;password =
|
;password =
|
||||||
hostname = 1.2.3.4
|
hostname = 1.2.3.4
|
||||||
@ -428,3 +438,20 @@ function table(columns, rows) {
|
|||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param prompt
|
||||||
|
* @return {Promise<string>}
|
||||||
|
*/
|
||||||
|
function question(prompt) {
|
||||||
|
const rl = readline.createInterface({
|
||||||
|
input: process.stdin,
|
||||||
|
output: process.stdout
|
||||||
|
})
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
rl.question(prompt, (answer) => {
|
||||||
|
rl.close()
|
||||||
|
resolve(answer)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
@ -2,7 +2,7 @@ const fs = require('fs')
|
|||||||
const ini = require('ini')
|
const ini = require('ini')
|
||||||
const {isNumeric} = require('./util')
|
const {isNumeric} = require('./util')
|
||||||
|
|
||||||
let config = null
|
let config = {}
|
||||||
|
|
||||||
function readFile(file) {
|
function readFile(file) {
|
||||||
if (!fs.existsSync(file))
|
if (!fs.existsSync(file))
|
||||||
@ -22,32 +22,34 @@ function processScheme(source, scheme) {
|
|||||||
|
|
||||||
let value = source[key] ?? opts.default ?? null
|
let value = source[key] ?? opts.default ?? null
|
||||||
|
|
||||||
switch (opts.type) {
|
if (value !== null) {
|
||||||
case 'int':
|
switch (opts.type) {
|
||||||
if (!isNumeric(value))
|
case 'int':
|
||||||
throw new Error(`'${key}' must be an integer`)
|
if (!isNumeric(value))
|
||||||
value = parseInt(value, 10)
|
throw new Error(`'${key}' must be an integer`)
|
||||||
break
|
value = parseInt(value, 10)
|
||||||
|
break
|
||||||
|
|
||||||
case 'float':
|
case 'float':
|
||||||
if (!isNumeric(value))
|
if (!isNumeric(value))
|
||||||
throw new Error(`'${key}' must be a float`)
|
throw new Error(`'${key}' must be a float`)
|
||||||
value = parseFloat(value)
|
value = parseFloat(value)
|
||||||
break
|
break
|
||||||
|
|
||||||
case 'object':
|
case 'object':
|
||||||
if (typeof value !== 'object')
|
if (typeof value !== 'object')
|
||||||
throw new Error(`'${key}' must be an object`)
|
throw new Error(`'${key}' must be an object`)
|
||||||
break
|
break
|
||||||
|
|
||||||
case 'boolean':
|
case 'boolean':
|
||||||
if (value !== null) {
|
if (typeof value === 'string') {
|
||||||
value = value.trim()
|
value = value.trim()
|
||||||
value = ['true', '1'].includes(value)
|
value = ['true', '1'].includes(value)
|
||||||
} else {
|
} else {
|
||||||
value = false
|
value = !!value
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
result[key] = value
|
result[key] = value
|
||||||
@ -64,6 +66,7 @@ function parseWorkerConfig(file) {
|
|||||||
host: {required: true},
|
host: {required: true},
|
||||||
port: {required: true, type: 'int'},
|
port: {required: true, type: 'int'},
|
||||||
password: {},
|
password: {},
|
||||||
|
always_allow_localhost: {type: 'boolean', default: false},
|
||||||
|
|
||||||
master_host: {},
|
master_host: {},
|
||||||
master_port: {type: 'int', default: 0},
|
master_port: {type: 'int', default: 0},
|
||||||
@ -111,6 +114,7 @@ function parseMasterConfig(file) {
|
|||||||
host: {required: true},
|
host: {required: true},
|
||||||
port: {required: true, type: 'int'},
|
port: {required: true, type: 'int'},
|
||||||
password: {},
|
password: {},
|
||||||
|
always_allow_localhost: {type: 'boolean', default: false},
|
||||||
|
|
||||||
ping_interval: {default: 30, type: 'int'},
|
ping_interval: {default: 30, type: 'int'},
|
||||||
poke_throttle_interval: {default: 0.5, type: 'float'},
|
poke_throttle_interval: {default: 0.5, type: 'float'},
|
||||||
@ -124,14 +128,8 @@ function parseMasterConfig(file) {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {string} file
|
* @param {string} file
|
||||||
* @param {{
|
|
||||||
* master: boolean,
|
|
||||||
* log_level: string|undefined,
|
|
||||||
* host: string,
|
|
||||||
* port: int,
|
|
||||||
* }} inputOptions
|
|
||||||
*/
|
*/
|
||||||
function parseJobctlConfig(file, inputOptions) {
|
function parseJobctlConfig(file) {
|
||||||
config = {}
|
config = {}
|
||||||
const raw = readFile(file)
|
const raw = readFile(file)
|
||||||
|
|
||||||
@ -141,17 +139,17 @@ function parseJobctlConfig(file, inputOptions) {
|
|||||||
log_level: {default: 'warn'},
|
log_level: {default: 'warn'},
|
||||||
}))
|
}))
|
||||||
|
|
||||||
if (inputOptions.master)
|
// if (inputOptions.master)
|
||||||
config.master = inputOptions.master
|
// config.master = inputOptions.master
|
||||||
Object.assign(config, processScheme(raw, {
|
Object.assign(config, processScheme(raw, {
|
||||||
host: {default: '127.0.0.1'},
|
host: {default: '127.0.0.1'},
|
||||||
port: {default: config.master ? 7081 : 7080, type: 'int'}
|
port: {/*default: config.master ? 7081 : 7080,*/ type: 'int'}
|
||||||
}))
|
}))
|
||||||
|
|
||||||
for (let key of ['log_level', 'host', 'port']) {
|
// for (let key of ['log_level', 'host', 'port']) {
|
||||||
if (inputOptions[key])
|
// if (inputOptions[key])
|
||||||
config[key] = inputOptions[key]
|
// config[key] = inputOptions[key]
|
||||||
}
|
// }
|
||||||
|
|
||||||
// console.log('parseJobctlConfig [2]', config)
|
// console.log('parseJobctlConfig [2]', config)
|
||||||
}
|
}
|
||||||
@ -164,26 +162,27 @@ function get(key = null) {
|
|||||||
if (key === null)
|
if (key === null)
|
||||||
return config
|
return config
|
||||||
|
|
||||||
if (typeof config !== 'object')
|
return config[key] || null
|
||||||
throw new Error(`config is not loaded`)
|
|
||||||
|
|
||||||
if (!(key in config))
|
|
||||||
throw new Error(`config: ${key} not found`)
|
|
||||||
|
|
||||||
return config[key]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {object} opts
|
* @param {object|string} key
|
||||||
|
* @param value
|
||||||
*/
|
*/
|
||||||
// function set(opts) {
|
function set(key, value) {
|
||||||
// Object.assign(config, opts)
|
if (!config)
|
||||||
// }
|
config = {}
|
||||||
|
if (typeof key === 'object') {
|
||||||
|
Object.assign(config, key)
|
||||||
|
} else {
|
||||||
|
config[key] = value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
parseWorkerConfig,
|
parseWorkerConfig,
|
||||||
parseMasterConfig,
|
parseMasterConfig,
|
||||||
parseJobctlConfig,
|
parseJobctlConfig,
|
||||||
get,
|
get,
|
||||||
// set,
|
set,
|
||||||
}
|
}
|
@ -261,13 +261,7 @@ class Connection extends EventEmitter {
|
|||||||
* @type {boolean}
|
* @type {boolean}
|
||||||
* @private
|
* @private
|
||||||
*/
|
*/
|
||||||
this._isAuthorized = config.get('password') === ''
|
this._isAuthorized = !config.get('password')
|
||||||
|
|
||||||
/**
|
|
||||||
* @type {boolean}
|
|
||||||
* @private
|
|
||||||
*/
|
|
||||||
this._textConversationAllowed = false
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @type {boolean}
|
* @type {boolean}
|
||||||
@ -328,10 +322,8 @@ class Connection extends EventEmitter {
|
|||||||
this.remoteAddress = socket.remoteAddress
|
this.remoteAddress = socket.remoteAddress
|
||||||
this.remotePort = socket.remotePort
|
this.remotePort = socket.remotePort
|
||||||
|
|
||||||
if (this.remoteAddress === '127.0.0.1') {
|
if (this.remoteAddress === '127.0.0.1' && config.get('always_allow_localhost') === true)
|
||||||
this._isAuthorized = true
|
this._isAuthorized = true
|
||||||
this._textConversationAllowed = true
|
|
||||||
}
|
|
||||||
|
|
||||||
this._setLogger()
|
this._setLogger()
|
||||||
this._setSocketEvents()
|
this._setSocketEvents()
|
||||||
@ -548,7 +540,7 @@ class Connection extends EventEmitter {
|
|||||||
|
|
||||||
// send password once (when talking to jobd-master)
|
// send password once (when talking to jobd-master)
|
||||||
if (!this._isAuthorized) {
|
if (!this._isAuthorized) {
|
||||||
message.setPassword(config.get('password'))
|
message.setPassword(config.get('password') || '')
|
||||||
this._isAuthorized = true
|
this._isAuthorized = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user