165 lines
4.9 KiB
Python
Executable File
165 lines
4.9 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import sys
|
|
import include_homekit
|
|
|
|
from enum import Enum
|
|
from typing import Optional, Union
|
|
from telegram import ReplyKeyboardMarkup
|
|
from functools import partial
|
|
|
|
from homekit.config import config, AppConfigUnit, Translation
|
|
from homekit.telegram import bot
|
|
from homekit.telegram.config import TelegramBotConfig
|
|
from homekit.mqtt import MqttPayload, MqttNode, MqttWrapper, MqttModule, MqttNodesConfig
|
|
from homekit.mqtt.module.relay import MqttRelayModule, MqttRelayState
|
|
from homekit.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload
|
|
|
|
|
|
if __name__ != '__main__':
|
|
print(f'this script can not be imported as module', file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
mqtt_nodes_config = MqttNodesConfig()
|
|
|
|
|
|
class RelayMqttBotConfig(AppConfigUnit, TelegramBotConfig):
|
|
NAME = 'relay_mqtt_bot'
|
|
|
|
_strings: Translation
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self._strings = Translation('mqtt_nodes')
|
|
|
|
@classmethod
|
|
def schema(cls) -> Optional[dict]:
|
|
return {
|
|
**super(TelegramBotConfig).schema(),
|
|
'relay_nodes': {
|
|
'type': 'list',
|
|
'required': True,
|
|
'schema': {
|
|
'type': 'string'
|
|
}
|
|
},
|
|
}
|
|
|
|
@staticmethod
|
|
def custom_validator(data):
|
|
relay_node_names = mqtt_nodes_config.get_nodes(filters=('relay',), only_names=True)
|
|
for node in data['relay_nodes']:
|
|
if node not in relay_node_names:
|
|
raise ValueError(f'unknown relay node "{node}"')
|
|
|
|
def get_relay_name_translated(self, lang: str, relay_name: str) -> str:
|
|
return self._strings.get(lang)[relay_name]['relay']
|
|
|
|
|
|
config.load_app(RelayMqttBotConfig)
|
|
|
|
bot.initialize()
|
|
bot.lang.ru(
|
|
start_message="Выберите команду на клавиатуре",
|
|
unknown_command="Неизвестная команда",
|
|
done="Готово 👌",
|
|
)
|
|
bot.lang.en(
|
|
start_message="Select command on the keyboard",
|
|
unknown_command="Unknown command",
|
|
done="Done 👌",
|
|
)
|
|
|
|
|
|
type_emojis = {
|
|
'lamp': '💡'
|
|
}
|
|
status_emoji = {
|
|
'on': '✅',
|
|
'off': '❌'
|
|
}
|
|
|
|
|
|
mqtt: MqttWrapper
|
|
relay_nodes: dict[str, Union[MqttRelayModule, MqttModule]] = {}
|
|
relay_states: dict[str, MqttRelayState] = {}
|
|
|
|
|
|
class UserAction(Enum):
|
|
ON = 'on'
|
|
OFF = 'off'
|
|
|
|
|
|
def on_mqtt_message(node: MqttNode,
|
|
message: MqttPayload):
|
|
if isinstance(message, InitialDiagnosticsPayload) or isinstance(message, DiagnosticsPayload):
|
|
kwargs = dict(rssi=message.rssi, enabled=message.flags.state)
|
|
if isinstance(message, InitialDiagnosticsPayload):
|
|
kwargs['fw_version'] = message.fw_version
|
|
if node.id not in relay_states:
|
|
relay_states[node.id] = MqttRelayState()
|
|
relay_states[node.id].update(**kwargs)
|
|
|
|
|
|
async def enable_handler(node_id: str, ctx: bot.Context) -> None:
|
|
relay_nodes[node_id].switchpower(True)
|
|
await ctx.reply(ctx.lang('done'))
|
|
|
|
|
|
async def disable_handler(node_id: str, ctx: bot.Context) -> None:
|
|
relay_nodes[node_id].switchpower(False)
|
|
await ctx.reply(ctx.lang('done'))
|
|
|
|
|
|
async def start(ctx: bot.Context) -> None:
|
|
await ctx.reply(ctx.lang('start_message'))
|
|
|
|
|
|
@bot.exceptionhandler
|
|
async def exception_handler(e: Exception, ctx: bot.Context) -> bool:
|
|
return False
|
|
|
|
|
|
@bot.defaultreplymarkup
|
|
def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
|
|
buttons = []
|
|
for node_id in config.app_config['relay_nodes']:
|
|
node_data = mqtt_nodes_config.get_node(node_id)
|
|
type_emoji = type_emojis[node_data['relay']['device_type']]
|
|
row = [f'{type_emoji}{status_emoji[i.value]} {config.app_config.get_relay_name_translated(ctx.user_lang, node_id)}'
|
|
for i in UserAction]
|
|
buttons.append(row)
|
|
return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)
|
|
|
|
|
|
devices = []
|
|
mqtt = MqttWrapper(client_id='relay_mqtt_bot')
|
|
for node_id in config.app_config['relay_nodes']:
|
|
node_data = mqtt_nodes_config.get_node(node_id)
|
|
mqtt_node = MqttNode(node_id=node_id,
|
|
node_secret=node_data['password'])
|
|
module_kwargs = {}
|
|
try:
|
|
if node_data['relay']['legacy_topics']:
|
|
module_kwargs['legacy_topics'] = True
|
|
except KeyError:
|
|
pass
|
|
relay_nodes[node_id] = mqtt_node.load_module('relay', **module_kwargs)
|
|
mqtt_node.add_payload_callback(on_mqtt_message)
|
|
mqtt.add_node(mqtt_node)
|
|
|
|
type_emoji = type_emojis[node_data['relay']['device_type']]
|
|
|
|
for action in UserAction:
|
|
messages = []
|
|
for _lang in Translation.LANGUAGES:
|
|
_label = config.app_config.get_relay_name_translated(_lang, node_id)
|
|
messages.append(f'{type_emoji}{status_emoji[action.value]} {_label}')
|
|
bot.handler(texts=messages)(partial(enable_handler if action == UserAction.ON else disable_handler, node_id))
|
|
|
|
mqtt.connect_and_loop(loop_forever=False)
|
|
|
|
bot.run(start_handler=start)
|
|
|
|
mqtt.disconnect()
|