jobd-master: support poll_workers in status()

This commit is contained in:
Evgeny Zinoviev 2021-03-02 01:48:32 +03:00
parent 7307003e57
commit 1b803a258a
4 changed files with 59 additions and 11 deletions

View File

@ -70,7 +70,9 @@ For optimization purposes, you can turn fields `target` and `slot` into `ENUM`s.
* **`poke(targets: string[])`** — send `poll` requests to all registered workers that serve
specified `targets`.
* **`status()`** — returns list of registered workers and memory usage.
* **`status(poll_workers=false: bool)`** — returns list of registered workers and
memory usage. If `pollWorkers` is true, sends `status()` request to all registered
workers and includes their responses.
* **`run-manual(jobs: {id: int, target: string}[])`** — send `run-manual`
requests to registered jobd instances serving specified targets, and return

View File

@ -125,14 +125,18 @@ async function onRequestMessage(message, connection) {
}
case 'status':
const info = workers.getInfo()
const info = await workers.getInfo(message.requestData?.poll_workers || false)
let status = {
workers: info,
memoryUsage: process.memoryUsage()
}
connection.send(
new ResponseMessage(message.requestNo)
.setData({
workers: info,
memoryUsage: process.memoryUsage()
})
.setData(status)
)
break
default:

View File

@ -521,7 +521,7 @@ class Connection extends EventEmitter {
* Send request
*
* @param {RequestMessage} message
* @return {Promise}
* @return {Promise<ResponseMessage>}
*/
sendRequest(message) {
if (!(message instanceof RequestMessage))

View File

@ -110,24 +110,66 @@ class WorkersList {
*/
_pokeWorkerConnection(connection, targets) {
this.logger.debug('_pokeWorkerConnection:', connection.remoteAddr(), targets)
connection.sendRequest(
new RequestMessage('poll', {
targets
})
)
.then(error => {
this.logger.error('_pokeWorkerConnection:', error)
})
}
/**
* @return {{targets: string[], remoteAddr: string, remotePort: number}[]}
*/
getInfo() {
return this.workers.map(worker => {
return {
async getInfo(pollWorkers = false) {
const promises = []
const workers = [...this.workers]
for (let i = 0; i < workers.length; i++) {
let worker = workers[i]
let P
if (pollWorkers) {
P = worker.connection.sendRequest(new RequestMessage('status'))
} else {
P = Promise.resolve()
}
promises.push(P)
}
const results = await Promise.allSettled(promises)
let info = []
for (let i = 0; i < results.length; i++) {
const result = results[i]
const worker = workers[i]
const workerInfo = {
remoteAddr: worker.connection.socket?.remoteAddress,
remotePort: worker.connection.socket?.remotePort,
targets: worker.targets
}
})
if (pollWorkers) {
if (result.status === 'fulfilled') {
/**
* @type {ResponseMessage}
*/
let response = result.value
workerInfo.workerStatus = response.data
} else if (result.status === 'rejected') {
workerInfo.workerStatusError = result.reason?.message
}
}
info.push(workerInfo)
}
return info
}
/**