telegram notifications improved

This commit is contained in:
Evgeny Zinoviev 2021-06-06 00:40:49 +03:00
parent c7612eba96
commit 80deea3930
2 changed files with 35 additions and 19 deletions

View File

@ -31,14 +31,19 @@ a list of ports expected to be open.
You can also set per-server `concurrency` and `timeout`. You can also set per-server `concurrency` and `timeout`.
```yaml ```yaml
server-1: telegram:
chat-id: -987
token: yourtoken
servers:
server-1:
host: 1.2.3.4 host: 1.2.3.4
opened: opened:
- 22 - 22
- 80 - 80
- 443 - 443
server-2: server-2:
host: 5.6.7.8 host: 5.6.7.8
opened: [] opened: []
concurrency: 1000 concurrency: 1000

View File

@ -3,6 +3,7 @@ import logging
import yaml import yaml
import math import math
from pprint import pprint
from argparse import ArgumentParser from argparse import ArgumentParser
from ch1p import telegram_notify from ch1p import telegram_notify
from threading import Thread, Lock from threading import Thread, Lock
@ -41,7 +42,7 @@ class Results:
opened.append(port) opened.append(port)
if not worker.is_expected(port): if not worker.is_expected(port):
self.warnings.append(f'On {worker.name} ({host}): port {port} is open') self.warnings.append(f'<b>{worker.name}</b> ({host}): port {port} is open')
print(f' {Colored.RED}{port} opened{Colored.END}') print(f' {Colored.RED}{port} opened{Colored.END}')
else: else:
print(f' {Colored.GREEN}{port} opened{Colored.END}') print(f' {Colored.GREEN}{port} opened{Colored.END}')
@ -50,15 +51,18 @@ class Results:
for port in worker.opened: for port in worker.opened:
if port not in opened: if port not in opened:
self.warnings.append( self.warnings.append(
f'On {worker.name} ({host}): port {port} expected to be opened, but it\'s not') f'<b>{worker.name}</b> ({host}): port {port} is NOT open')
print(f' {Colored.RED}{port} not opened{Colored.END}') print(f' {Colored.RED}{port} not opened{Colored.END}')
print() print()
def has_warnings(self): def has_warnings(self):
return len(self.warnings) > 0 return len(self.warnings) > 0
def notify(self): def notify(self, chat_id=None, token=None):
telegram_notify(escape('\n'.join(self.warnings)), parse_mode='html') text = '<b>❗️ Attention!</b>\n\n'
text += '\n'.join(self.warnings)
telegram_notify(text, parse_mode='html', chat_id=chat_id, token=token)
class Worker(Thread): class Worker(Thread):
@ -109,15 +113,21 @@ def main():
help='just print results, don\'t send to telegram') help='just print results, don\'t send to telegram')
args = parser.parse_args() args = parser.parse_args()
# setup loggign
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=(logging.DEBUG if args.verbose else logging.INFO)) level=(logging.DEBUG if args.verbose else logging.INFO))
# load config
with open(args.config, 'r') as f: with open(args.config, 'r') as f:
config = yaml.safe_load(f) config = yaml.safe_load(f)
# pprint(config)
if not isinstance(config, dict): assert isinstance(config, dict)
raise TypeError('invalid config') assert 'servers' in config
if not args.no_telegram:
assert 'telegram' in config
# let's go
results = Results() results = Results()
max_threads = math.inf if args.threads_limit == 0 else args.threads_limit max_threads = math.inf if args.threads_limit == 0 else args.threads_limit
active_threads = 1 active_threads = 1
@ -129,7 +139,7 @@ def main():
return n return n
workers = [] workers = []
for name, data in config.items(): for name, data in config['servers'].items():
w = Worker(name, data['host'], data['opened'], w = Worker(name, data['host'], data['opened'],
concurrency=int(data['concurrency']) if 'concurrency' in data else args.concurrency, concurrency=int(data['concurrency']) if 'concurrency' in data else args.concurrency,
timeout=int(data['timeout']) if 'timeout' in data else args.timeout) timeout=int(data['timeout']) if 'timeout' in data else args.timeout)
@ -156,7 +166,8 @@ def main():
results.add(cw) results.add(cw)
if results.has_warnings() and not args.no_telegram: if results.has_warnings() and not args.no_telegram:
results.notify() results.notify(chat_id=config['telegram']['chat-id'],
token=config['telegram']['token'])
if __name__ == '__main__': if __name__ == '__main__':