homekit/src/relay_mqtt_bot.py
Evgeny Zinoviev b26a622543 wip
2023-06-09 02:05:29 +03:00

155 lines
4.4 KiB
Python
Executable File

#!/usr/bin/env python3
import sys
from enum import Enum
from typing import Optional, Union
from telegram import ReplyKeyboardMarkup
from functools import partial
from home.config import config, AppConfigUnit
from home.telegram import bot
from home.telegram.config import TelegramBotConfig
from home.mqtt import MqttPayload, MqttNode, MqttWrapper, MqttModule
from home.mqtt import MqttNodesConfig
from home.mqtt.module.relay import MqttRelayModule, MqttRelayState
from home.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload
if __name__ != '__main__':
print(f'this script can not be imported as module', file=sys.stderr)
sys.exit(1)
mqtt_nodes_config = MqttNodesConfig()
class RelayMqttBotConfig(AppConfigUnit, TelegramBotConfig):
NAME = 'relay_mqtt_bot'
@staticmethod
def schema() -> Optional[dict]:
return {
**super(TelegramBotConfig).schema(),
'relay_nodes': {
'type': 'list',
'required': True,
'schema': {
'type': 'string'
}
},
}
@staticmethod
def custom_validator(data):
relay_node_names = mqtt_nodes_config.get_nodes(filters=('relay',), only_names=True)
for node in data['relay_nodes']:
if node not in relay_node_names:
raise ValueError(f'unknown relay node "{node}"')
def get_relay_name_translated(self, lang: str, relay_name: str) -> str:
pass
config.load_app(RelayMqttBotConfig)
bot.initialize()
bot.lang.ru(
start_message="Выберите команду на клавиатуре",
unknown_command="Неизвестная команда",
done="Готово 👌",
)
bot.lang.en(
start_message="Select command on the keyboard",
unknown_command="Unknown command",
done="Done 👌",
)
type_emojis = {
'lamp': '💡'
}
status_emoji = {
'on': '',
'off': ''
}
mqtt: Optional[MqttWrapper] = None
relay_nodes: dict[str, Union[MqttRelayModule, MqttModule]] = {}
relay_states: dict[str, MqttRelayState] = {}
class UserAction(Enum):
ON = 'on'
OFF = 'off'
def on_mqtt_message(node: MqttNode,
message: MqttPayload):
if isinstance(message, InitialDiagnosticsPayload) or isinstance(message, DiagnosticsPayload):
kwargs = dict(rssi=message.rssi, enabled=message.flags.state)
if isinstance(message, InitialDiagnosticsPayload):
kwargs['fw_version'] = message.fw_version
if node.id not in relay_states:
relay_states[node.id] = MqttRelayState()
relay_states[node.id].update(**kwargs)
def enable_handler(node_id: str, ctx: bot.Context) -> None:
relay_nodes[node_id].switchpower(True)
ctx.reply(ctx.lang('done'))
def disable_handler(node_id: str, ctx: bot.Context) -> None:
relay_nodes[node_id].switchpower(False)
ctx.reply(ctx.lang('done'))
def start(ctx: bot.Context) -> None:
ctx.reply(ctx.lang('start_message'))
@bot.exceptionhandler
def exception_handler(e: Exception, ctx: bot.Context) -> bool:
return False
@bot.defaultreplymarkup
def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
buttons = []
for device_id, data in config['relays'].items():
labels = data['labels']
type_emoji = type_emojis[data['type']]
row = [f'{type_emoji}{status_emoji[i.value]} {labels[ctx.user_lang]}'
for i in UserAction]
buttons.append(row)
return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)
devices = []
mqtt = MqttWrapper(client_id='relay_mqtt_bot')
for device_id, data in config['relays'].items():
mqtt_node = MqttNode(node_id=device_id, node_secret=data['secret'])
relay_nodes[device_id] = mqtt_node.load_module('relay')
mqtt_node.add_payload_callback(on_mqtt_message)
mqtt.add_node(mqtt_node)
labels = data['labels']
bot.lang.ru(**{device_id: labels['ru']})
bot.lang.en(**{device_id: labels['en']})
type_emoji = type_emojis[data['type']]
for action in UserAction:
messages = []
for _lang, _label in labels.items():
messages.append(f'{type_emoji}{status_emoji[action.value]} {labels[_lang]}')
bot.handler(texts=messages)(partial(enable_handler if action == UserAction.ON else disable_handler, device_id))
mqtt.configure_tls()
mqtt.connect_and_loop(loop_forever=False)
bot.run(start_handler=start)
mqtt.disconnect()