118 lines
3.4 KiB
Python
Executable File
118 lines
3.4 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
from enum import Enum
|
|
from typing import Optional
|
|
from telegram import ReplyKeyboardMarkup
|
|
from functools import partial
|
|
|
|
from home.config import config
|
|
from home.telegram import bot
|
|
from home.mqtt import MqttPayload, MqttNode, MqttWrapper
|
|
from home.mqtt.module.relay import MqttRelayModule, MqttRelayState
|
|
from home.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload
|
|
|
|
|
|
config.load_app('relay_mqtt_bot')
|
|
|
|
bot.initialize()
|
|
bot.lang.ru(
|
|
start_message="Выберите команду на клавиатуре",
|
|
unknown_command="Неизвестная команда",
|
|
done="Готово 👌",
|
|
)
|
|
bot.lang.en(
|
|
start_message="Select command on the keyboard",
|
|
unknown_command="Unknown command",
|
|
done="Done 👌",
|
|
)
|
|
|
|
|
|
type_emojis = {
|
|
'lamp': '💡'
|
|
}
|
|
status_emoji = {
|
|
'on': '✅',
|
|
'off': '❌'
|
|
}
|
|
|
|
|
|
# mqtt_relay: Optional[MqttRelayModule] = None
|
|
mqtt: Optional[MqttWrapper] = None
|
|
relay_nodes: dict[str, MqttRelayModule] = {}
|
|
relay_states: dict[str, MqttRelayState] = {}
|
|
|
|
|
|
class UserAction(Enum):
|
|
ON = 'on'
|
|
OFF = 'off'
|
|
|
|
|
|
def on_mqtt_message(node: MqttNode,
|
|
message: MqttPayload):
|
|
if isinstance(message, InitialDiagnosticsPayload) or isinstance(message, DiagnosticsPayload):
|
|
kwargs = dict(rssi=message.rssi, enabled=message.flags.state)
|
|
if isinstance(message, InitialDiagnosticsPayload):
|
|
kwargs['fw_version'] = message.fw_version
|
|
if node.id not in relay_states:
|
|
relay_states[node.id] = MqttRelayState()
|
|
relay_states[node.id].update(**kwargs)
|
|
|
|
|
|
def enable_handler(node_id: str, ctx: bot.Context) -> None:
|
|
relay_nodes[node_id].switchpower(True)
|
|
ctx.reply(ctx.lang('done'))
|
|
|
|
|
|
def disable_handler(node_id: str, ctx: bot.Context) -> None:
|
|
relay_nodes[node_id].switchpower(False)
|
|
ctx.reply(ctx.lang('done'))
|
|
|
|
|
|
def start(ctx: bot.Context) -> None:
|
|
ctx.reply(ctx.lang('start_message'))
|
|
|
|
|
|
@bot.exceptionhandler
|
|
def exception_handler(e: Exception, ctx: bot.Context) -> bool:
|
|
return False
|
|
|
|
|
|
@bot.defaultreplymarkup
|
|
def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
|
|
buttons = []
|
|
for device_id, data in config['relays'].items():
|
|
labels = data['labels']
|
|
type_emoji = type_emojis[data['type']]
|
|
row = [f'{type_emoji}{status_emoji[i.value]} {labels[ctx.user_lang]}'
|
|
for i in UserAction]
|
|
buttons.append(row)
|
|
return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
devices = []
|
|
mqtt = MqttWrapper()
|
|
for device_id, data in config['relays'].items():
|
|
mqtt_node = MqttNode(node_id=device_id, node_secret=data['secret'])
|
|
relay_nodes[device_id] = mqtt_node.load_module('relay')
|
|
mqtt_node.add_payload_callback(on_mqtt_message)
|
|
mqtt.add_node(mqtt_node)
|
|
|
|
labels = data['labels']
|
|
bot.lang.ru(**{device_id: labels['ru']})
|
|
bot.lang.en(**{device_id: labels['en']})
|
|
|
|
type_emoji = type_emojis[data['type']]
|
|
|
|
for action in UserAction:
|
|
messages = []
|
|
for _lang, _label in labels.items():
|
|
messages.append(f'{type_emoji}{status_emoji[action.value]} {labels[_lang]}')
|
|
bot.handler(texts=messages)(partial(enable_handler if action == UserAction.ON else disable_handler, device_id))
|
|
|
|
mqtt.configure_tls()
|
|
mqtt.connect_and_loop(loop_forever=False)
|
|
|
|
bot.run(start_handler=start)
|
|
|
|
mqtt.disconnect()
|