wip
This commit is contained in:
parent
0026ec6256
commit
2d6bb82787
75
src/home/telegram/config.py
Normal file
75
src/home/telegram/config.py
Normal file
@ -0,0 +1,75 @@
|
||||
from ..config import ConfigUnit
|
||||
from typing import Optional, Union
|
||||
from abc import ABC
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class TelegramUserListType(Enum):
|
||||
USERS = 'users'
|
||||
NOTIFY = 'notify_users'
|
||||
|
||||
|
||||
class TelegramUserIdsConfig(ConfigUnit):
|
||||
NAME = 'telegram_user_ids'
|
||||
|
||||
@staticmethod
|
||||
def schema() -> Optional[dict]:
|
||||
return {
|
||||
'type': 'dict',
|
||||
'schema': {'type': 'int'}
|
||||
}
|
||||
|
||||
|
||||
_user_ids_config = TelegramUserIdsConfig()
|
||||
|
||||
|
||||
def _user_id_mapper(user: Union[str, int]) -> int:
|
||||
if isinstance(user, int):
|
||||
return user
|
||||
return _user_ids_config[user]
|
||||
|
||||
|
||||
class TelegramChatsConfig(ConfigUnit):
|
||||
NAME = 'telegram_chats'
|
||||
|
||||
@staticmethod
|
||||
def schema() -> Optional[dict]:
|
||||
return {
|
||||
'type': 'dict',
|
||||
'schema': {
|
||||
'id': {'type': 'string', 'required': True},
|
||||
'token': {'type': 'string', 'required': True},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class TelegramBotConfig(ConfigUnit, ABC):
|
||||
@staticmethod
|
||||
def schema() -> Optional[dict]:
|
||||
return {
|
||||
'bot': {
|
||||
'type': 'dict',
|
||||
'schema': {
|
||||
'token': {'type': 'string', 'required': True},
|
||||
TelegramUserListType.USERS: {**TelegramBotConfig._userlist_schema(), 'required': True},
|
||||
TelegramUserListType.NOTIFY: TelegramBotConfig._userlist_schema(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _userlist_schema() -> dict:
|
||||
return {'type': 'list', 'schema': {'type': ['string', 'int']}}
|
||||
|
||||
@staticmethod
|
||||
def custom_validator(data):
|
||||
for ult in TelegramUserListType:
|
||||
users = data['bot'][ult.value]
|
||||
for user in users:
|
||||
if isinstance(user, str):
|
||||
if user not in _user_ids_config:
|
||||
raise ValueError(f'user {user} not found in {TelegramUserIdsConfig.NAME}')
|
||||
|
||||
def get_user_ids(self,
|
||||
ult: TelegramUserListType = TelegramUserListType.USERS) -> list[int]:
|
||||
return list(map(_user_id_mapper, self['bot'][ult.value]))
|
@ -4,6 +4,7 @@ import re
|
||||
import datetime
|
||||
import json
|
||||
import itertools
|
||||
import sys
|
||||
|
||||
from inverterd import Format, InverterError
|
||||
from html import escape
|
||||
@ -12,6 +13,7 @@ from typing import Optional, Tuple, Union
|
||||
from home.util import chunks
|
||||
from home.config import config, AppConfigUnit
|
||||
from home.telegram import bot
|
||||
from home.telegram.config import TelegramBotConfig, TelegramUserListType
|
||||
from home.inverter import (
|
||||
wrapper_instance as inverter,
|
||||
beautify_table,
|
||||
@ -29,6 +31,12 @@ from home.api.types import BotType
|
||||
from home.api import WebAPIClient
|
||||
from telegram import ReplyKeyboardMarkup, InlineKeyboardMarkup, InlineKeyboardButton
|
||||
|
||||
|
||||
if __name__ != '__main__':
|
||||
print(f'this script can not be imported as module', file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
db = None
|
||||
LT = escape('<=')
|
||||
flags_map = {
|
||||
@ -44,16 +52,11 @@ flags_map = {
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class InverterBotConfig(AppConfigUnit):
|
||||
class InverterBotConfig(AppConfigUnit, TelegramBotConfig):
|
||||
NAME = 'inverter_bot'
|
||||
|
||||
@staticmethod
|
||||
def schema() -> Optional[dict]:
|
||||
userlist_schema = {
|
||||
'type': 'list',
|
||||
'empty': False,
|
||||
'schema': {'type': 'integer'}
|
||||
}
|
||||
acmode_item_schema = {
|
||||
'thresholds': {
|
||||
'type': 'list',
|
||||
@ -68,15 +71,7 @@ class InverterBotConfig(AppConfigUnit):
|
||||
}
|
||||
|
||||
return {
|
||||
'bot': {
|
||||
'type': 'dict',
|
||||
'required': True,
|
||||
'schema': {
|
||||
'token': {'type': 'string'},
|
||||
'users': userlist_schema,
|
||||
'notify_users': userlist_schema
|
||||
}
|
||||
},
|
||||
**super(TelegramBotConfig).schema(),
|
||||
'ac_mode': {
|
||||
'type': 'dict',
|
||||
'required': True,
|
||||
|
@ -1,5 +1,6 @@
|
||||
#!/usr/bin/env python3
|
||||
import os.path
|
||||
|
||||
from time import sleep
|
||||
from typing import Optional
|
||||
from argparse import ArgumentParser, ArgumentError
|
||||
@ -22,7 +23,7 @@ if __name__ == '__main__':
|
||||
parser.add_argument('--switch-relay', choices=[0, 1], type=int,
|
||||
help='send relay state')
|
||||
parser.add_argument('--push-ota', type=str, metavar='OTA_FILENAME',
|
||||
help='push ota, argument receives filename')
|
||||
help='push OTA, receives path to firmware.bin')
|
||||
|
||||
config.load_app(parser=parser, no_config=True)
|
||||
arg = parser.parse_args()
|
||||
|
@ -1,17 +1,53 @@
|
||||
#!/usr/bin/env python3
|
||||
import sys
|
||||
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
from typing import Optional, Union
|
||||
from telegram import ReplyKeyboardMarkup
|
||||
from functools import partial
|
||||
|
||||
from home.config import config
|
||||
from home.config import config, AppConfigUnit
|
||||
from home.telegram import bot
|
||||
from home.mqtt import MqttPayload, MqttNode, MqttWrapper
|
||||
from home.telegram.config import TelegramBotConfig
|
||||
from home.mqtt import MqttPayload, MqttNode, MqttWrapper, MqttModule
|
||||
from home.mqtt import MqttNodesConfig
|
||||
from home.mqtt.module.relay import MqttRelayModule, MqttRelayState
|
||||
from home.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload
|
||||
|
||||
|
||||
config.load_app('relay_mqtt_bot')
|
||||
if __name__ != '__main__':
|
||||
print(f'this script can not be imported as module', file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
mqtt_nodes_config = MqttNodesConfig()
|
||||
|
||||
|
||||
class RelayMqttBotConfig(AppConfigUnit, TelegramBotConfig):
|
||||
NAME = 'relay_mqtt_bot'
|
||||
|
||||
@staticmethod
|
||||
def schema() -> Optional[dict]:
|
||||
return {
|
||||
**super(TelegramBotConfig).schema(),
|
||||
'relay_nodes': {
|
||||
'type': 'list',
|
||||
'required': True,
|
||||
'schema': {
|
||||
'type': 'string'
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def custom_validator(data):
|
||||
relay_node_names = mqtt_nodes_config.get_nodes(filters=('relay',), only_names=True)
|
||||
for node in data['relay_nodes']:
|
||||
if node not in relay_node_names:
|
||||
raise ValueError(f'unknown relay node "{node}"')
|
||||
|
||||
|
||||
config.load_app(RelayMqttBotConfig)
|
||||
|
||||
bot.initialize()
|
||||
bot.lang.ru(
|
||||
@ -35,9 +71,8 @@ status_emoji = {
|
||||
}
|
||||
|
||||
|
||||
# mqtt_relay: Optional[MqttRelayModule] = None
|
||||
mqtt: Optional[MqttWrapper] = None
|
||||
relay_nodes: dict[str, MqttRelayModule] = {}
|
||||
relay_nodes: dict[str, Union[MqttRelayModule, MqttModule]] = {}
|
||||
relay_states: dict[str, MqttRelayState] = {}
|
||||
|
||||
|
||||
@ -88,30 +123,29 @@ def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
|
||||
return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
devices = []
|
||||
mqtt = MqttWrapper(client_id='relay_mqtt_bot')
|
||||
for device_id, data in config['relays'].items():
|
||||
mqtt_node = MqttNode(node_id=device_id, node_secret=data['secret'])
|
||||
relay_nodes[device_id] = mqtt_node.load_module('relay')
|
||||
mqtt_node.add_payload_callback(on_mqtt_message)
|
||||
mqtt.add_node(mqtt_node)
|
||||
devices = []
|
||||
mqtt = MqttWrapper(client_id='relay_mqtt_bot')
|
||||
for device_id, data in config['relays'].items():
|
||||
mqtt_node = MqttNode(node_id=device_id, node_secret=data['secret'])
|
||||
relay_nodes[device_id] = mqtt_node.load_module('relay')
|
||||
mqtt_node.add_payload_callback(on_mqtt_message)
|
||||
mqtt.add_node(mqtt_node)
|
||||
|
||||
labels = data['labels']
|
||||
bot.lang.ru(**{device_id: labels['ru']})
|
||||
bot.lang.en(**{device_id: labels['en']})
|
||||
labels = data['labels']
|
||||
bot.lang.ru(**{device_id: labels['ru']})
|
||||
bot.lang.en(**{device_id: labels['en']})
|
||||
|
||||
type_emoji = type_emojis[data['type']]
|
||||
type_emoji = type_emojis[data['type']]
|
||||
|
||||
for action in UserAction:
|
||||
messages = []
|
||||
for _lang, _label in labels.items():
|
||||
messages.append(f'{type_emoji}{status_emoji[action.value]} {labels[_lang]}')
|
||||
bot.handler(texts=messages)(partial(enable_handler if action == UserAction.ON else disable_handler, device_id))
|
||||
for action in UserAction:
|
||||
messages = []
|
||||
for _lang, _label in labels.items():
|
||||
messages.append(f'{type_emoji}{status_emoji[action.value]} {labels[_lang]}')
|
||||
bot.handler(texts=messages)(partial(enable_handler if action == UserAction.ON else disable_handler, device_id))
|
||||
|
||||
mqtt.configure_tls()
|
||||
mqtt.connect_and_loop(loop_forever=False)
|
||||
mqtt.configure_tls()
|
||||
mqtt.connect_and_loop(loop_forever=False)
|
||||
|
||||
bot.run(start_handler=start)
|
||||
bot.run(start_handler=start)
|
||||
|
||||
mqtt.disconnect()
|
||||
mqtt.disconnect()
|
||||
|
Loading…
x
Reference in New Issue
Block a user