homekit/bin/relay_mqtt_bot.py
2024-02-27 00:01:50 +03:00

165 lines
4.9 KiB
Python
Executable File

#!/usr/bin/env python3
import sys
import include_homekit
from enum import Enum
from typing import Optional, Union
from telegram import ReplyKeyboardMarkup
from functools import partial
from homekit.config import config, AppConfigUnit, Translation
from homekit.telegram import bot
from homekit.telegram.config import TelegramBotConfig
from homekit.mqtt import MqttPayload, MqttNode, MqttWrapper, MqttModule, MqttNodesConfig
from homekit.mqtt.module.relay import MqttRelayModule, MqttRelayState
from homekit.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload
if __name__ != '__main__':
print(f'this script can not be imported as module', file=sys.stderr)
sys.exit(1)
mqtt_nodes_config = MqttNodesConfig()
class RelayMqttBotConfig(AppConfigUnit, TelegramBotConfig):
NAME = 'relay_mqtt_bot'
_strings: Translation
def __init__(self):
super().__init__()
self._strings = Translation('mqtt_nodes')
@classmethod
def schema(cls) -> Optional[dict]:
return {
**super(TelegramBotConfig).schema(),
'relay_nodes': {
'type': 'list',
'required': True,
'schema': {
'type': 'string'
}
},
}
@staticmethod
def custom_validator(data):
relay_node_names = mqtt_nodes_config.get_nodes(filters=('relay',), only_names=True)
for node in data['relay_nodes']:
if node not in relay_node_names:
raise ValueError(f'unknown relay node "{node}"')
def get_relay_name_translated(self, lang: str, relay_name: str) -> str:
return self._strings.get(lang)[relay_name]['relay']
config.load_app(RelayMqttBotConfig)
bot.initialize()
bot.lang.ru(
start_message="Выберите команду на клавиатуре",
unknown_command="Неизвестная команда",
done="Готово 👌",
)
bot.lang.en(
start_message="Select command on the keyboard",
unknown_command="Unknown command",
done="Done 👌",
)
type_emojis = {
'lamp': '💡'
}
status_emoji = {
'on': '',
'off': ''
}
mqtt: MqttWrapper
relay_nodes: dict[str, Union[MqttRelayModule, MqttModule]] = {}
relay_states: dict[str, MqttRelayState] = {}
class UserAction(Enum):
ON = 'on'
OFF = 'off'
def on_mqtt_message(node: MqttNode,
message: MqttPayload):
if isinstance(message, InitialDiagnosticsPayload) or isinstance(message, DiagnosticsPayload):
kwargs = dict(rssi=message.rssi, enabled=message.flags.state)
if isinstance(message, InitialDiagnosticsPayload):
kwargs['fw_version'] = message.fw_version
if node.id not in relay_states:
relay_states[node.id] = MqttRelayState()
relay_states[node.id].update(**kwargs)
async def enable_handler(node_id: str, ctx: bot.Context) -> None:
relay_nodes[node_id].switchpower(True)
await ctx.reply(ctx.lang('done'))
async def disable_handler(node_id: str, ctx: bot.Context) -> None:
relay_nodes[node_id].switchpower(False)
await ctx.reply(ctx.lang('done'))
async def start(ctx: bot.Context) -> None:
await ctx.reply(ctx.lang('start_message'))
@bot.exceptionhandler
async def exception_handler(e: Exception, ctx: bot.Context) -> bool:
return False
@bot.defaultreplymarkup
def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
buttons = []
for node_id in config.app_config['relay_nodes']:
node_data = mqtt_nodes_config.get_node(node_id)
type_emoji = type_emojis[node_data['relay']['device_type']]
row = [f'{type_emoji}{status_emoji[i.value]} {config.app_config.get_relay_name_translated(ctx.user_lang, node_id)}'
for i in UserAction]
buttons.append(row)
return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)
devices = []
mqtt = MqttWrapper(client_id='relay_mqtt_bot')
for node_id in config.app_config['relay_nodes']:
node_data = mqtt_nodes_config.get_node(node_id)
mqtt_node = MqttNode(node_id=node_id,
node_secret=node_data['password'])
module_kwargs = {}
try:
if node_data['relay']['legacy_topics']:
module_kwargs['legacy_topics'] = True
except KeyError:
pass
relay_nodes[node_id] = mqtt_node.load_module('relay', **module_kwargs)
mqtt_node.add_payload_callback(on_mqtt_message)
mqtt.add_node(mqtt_node)
type_emoji = type_emojis[node_data['relay']['device_type']]
for action in UserAction:
messages = []
for _lang in Translation.LANGUAGES:
_label = config.app_config.get_relay_name_translated(_lang, node_id)
messages.append(f'{type_emoji}{status_emoji[action.value]} {_label}')
bot.handler(texts=messages)(partial(enable_handler if action == UserAction.ON else disable_handler, node_id))
mqtt.connect_and_loop(loop_forever=False)
bot.run(start_handler=start)
mqtt.disconnect()