new config: port openwrt_log_analyzer

This commit is contained in:
Evgeny Zinoviev 2023-06-10 22:44:31 +03:00
parent 3790c22053
commit f3b9d50496
3 changed files with 58 additions and 42 deletions

View File

@ -2,25 +2,27 @@ import requests
import logging import logging
from typing import Tuple from typing import Tuple
from ..config import config from .config import TelegramChatsConfig
_chats = TelegramChatsConfig()
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
def send_message(text: str, def send_message(text: str,
parse_mode: str = None, chat: str,
disable_web_page_preview: bool = False): parse_mode: str = 'HTML',
data, token = _send_telegram_data(text, parse_mode, disable_web_page_preview) disable_web_page_preview: bool = False,):
data, token = _send_telegram_data(text, chat, parse_mode, disable_web_page_preview)
req = requests.post('https://api.telegram.org/bot%s/sendMessage' % token, data=data) req = requests.post('https://api.telegram.org/bot%s/sendMessage' % token, data=data)
return req.json() return req.json()
def send_photo(filename: str): def send_photo(filename: str, chat: str):
chat_data = _chats[chat]
data = { data = {
'chat_id': config['telegram']['chat_id'], 'chat_id': chat_data['id'],
} }
token = config['telegram']['token'] token = chat_data['token']
url = f'https://api.telegram.org/bot{token}/sendPhoto' url = f'https://api.telegram.org/bot{token}/sendPhoto'
with open(filename, "rb") as fd: with open(filename, "rb") as fd:
@ -29,19 +31,19 @@ def send_photo(filename: str):
def _send_telegram_data(text: str, def _send_telegram_data(text: str,
chat: str,
parse_mode: str = None, parse_mode: str = None,
disable_web_page_preview: bool = False) -> Tuple[dict, str]: disable_web_page_preview: bool = False) -> Tuple[dict, str]:
chat_data = _chats[chat]
data = { data = {
'chat_id': config['telegram']['chat_id'], 'chat_id': chat_data['id'],
'text': text 'text': text
} }
if parse_mode is not None: if parse_mode is not None:
data['parse_mode'] = parse_mode data['parse_mode'] = parse_mode
elif 'parse_mode' in config['telegram']:
data['parse_mode'] = config['telegram']['parse_mode']
if disable_web_page_preview or 'disable_web_page_preview' in config['telegram']: if disable_web_page_preview:
data['disable_web_page_preview'] = 1 data['disable_web_page_preview'] = 1
return data, config['telegram']['token'] return data, chat_data['token']

View File

@ -36,6 +36,14 @@ def validate_ipv4_or_hostname(address: str, raise_exception: bool = False) -> bo
return False return False
def validate_mac_address(mac_address: str) -> bool:
mac_pattern = r'^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$'
if re.match(mac_pattern, mac_address):
return True
else:
return False
class Addr: class Addr:
host: str host: str
port: Optional[int] port: Optional[int]

View File

@ -1,33 +1,39 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import home.telegram as telegram import home.telegram as telegram
from home.config import config from home.telegram.config import TelegramChatsConfig
from home.util import validate_mac_address
from typing import Optional
from home.config import config, AppConfigUnit
from home.database import BotsDatabase, SimpleState from home.database import BotsDatabase, SimpleState
"""
config.toml example:
[simple_state] class OpenwrtLogAnalyzerConfig(AppConfigUnit):
file = "/home/user/.config/openwrt_log_analyzer/state.txt" @classmethod
def schema(cls) -> Optional[dict]:
return {
'database_name': {'type': 'string', 'required': True},
'devices': {
'type': 'dict',
'keysrules': {'type': 'string'},
'valuesrules': {
'type': 'string',
'check_with': validate_mac_address
}
},
'limit': {'type': 'integer'},
'telegram_chat': {'type': 'string'},
'aps': {
'type': 'list',
'schema': {'type': 'integer'}
}
}
[mysql] @staticmethod
host = "localhost" def custom_validator(data):
database = ".." chats = TelegramChatsConfig()
user = ".." if data['telegram_chat'] not in chats:
password = ".." return ValueError(f'unknown telegram chat {data["telegram_chat"]}')
[devices]
Device1 = "00:00:00:00:00:00"
Device2 = "01:01:01:01:01:01"
[telegram]
chat_id = ".."
token = ".."
parse_mode = "HTML"
[openwrt_log_analyzer]
limit = 10
"""
def main(mac: str, def main(mac: str,
@ -48,18 +54,18 @@ def main(mac: str,
max_id = log.id max_id = log.id
text = '\n'.join(map(lambda s: str(s), data)) text = '\n'.join(map(lambda s: str(s), data))
telegram.send_message(f'<b>{title} (AP #{ap})</b>\n\n' + text) telegram.send_message(f'<b>{title} (AP #{ap})</b>\n\n' + text, config.app_config['telegram_chat'])
return max_id return max_id
if __name__ == '__main__': if __name__ == '__main__':
config.load_app('openwrt_log_analyzer') config.load_app(OpenwrtLogAnalyzerConfig)
for ap in config['openwrt_log_analyzer']['aps']: for ap in config.app_config['aps']:
state_file = config['simple_state']['file'] dbname = config.app_config['database_name']
state_file = state_file.replace('.txt', f'-{ap}.txt') dbname = dbname.replace('.txt', f'-{ap}.txt')
state = SimpleState(name=state_file, state = SimpleState(name=dbname,
default={'last_id': 0}) default={'last_id': 0})
max_last_id = 0 max_last_id = 0