115 lines
3.3 KiB
Python
Executable File
115 lines
3.3 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
from enum import Enum
|
|
from typing import Optional
|
|
from telegram import ReplyKeyboardMarkup
|
|
from functools import partial
|
|
|
|
from home.config import config
|
|
from home.telegram import bot
|
|
from home.mqtt import MqttRelay, MqttRelayState
|
|
from home.mqtt.esp import MqttEspDevice
|
|
from home.mqtt.payload import MqttPayload
|
|
from home.mqtt.payload.relay import InitialDiagnosticsPayload, DiagnosticsPayload
|
|
|
|
|
|
config.load('relay_mqtt_bot')
|
|
|
|
bot.initialize()
|
|
bot.lang.ru(
|
|
start_message="Выберите команду на клавиатуре",
|
|
unknown_command="Неизвестная команда",
|
|
done="Готово 👌",
|
|
)
|
|
bot.lang.en(
|
|
start_message="Select command on the keyboard",
|
|
unknown_command="Unknown command",
|
|
done="Done 👌",
|
|
)
|
|
|
|
|
|
type_emojis = {
|
|
'lamp': '💡'
|
|
}
|
|
status_emoji = {
|
|
'on': '✅',
|
|
'off': '❌'
|
|
}
|
|
|
|
|
|
mqtt_relay: Optional[MqttRelay] = None
|
|
relay_states: dict[str, MqttRelayState] = {}
|
|
|
|
|
|
class UserAction(Enum):
|
|
ON = 'on'
|
|
OFF = 'off'
|
|
|
|
|
|
def on_mqtt_message(home_id, message: MqttPayload):
|
|
if isinstance(message, InitialDiagnosticsPayload) or isinstance(message, DiagnosticsPayload):
|
|
kwargs = dict(rssi=message.rssi, enabled=message.flags.state)
|
|
if isinstance(message, InitialDiagnosticsPayload):
|
|
kwargs['fw_version'] = message.fw_version
|
|
if home_id not in relay_states:
|
|
relay_states[home_id] = MqttRelayState()
|
|
relay_states[home_id].update(**kwargs)
|
|
|
|
|
|
def enable_handler(home_id: str, ctx: bot.Context) -> None:
|
|
mqtt_relay.set_power(home_id, True)
|
|
ctx.reply(ctx.lang('done'))
|
|
|
|
|
|
def disable_handler(home_id: str, ctx: bot.Context) -> None:
|
|
mqtt_relay.set_power(home_id, False)
|
|
ctx.reply(ctx.lang('done'))
|
|
|
|
|
|
def start(ctx: bot.Context) -> None:
|
|
ctx.reply(ctx.lang('start_message'))
|
|
|
|
|
|
@bot.exceptionhandler
|
|
def exception_handler(e: Exception, ctx: bot.Context) -> bool:
|
|
return False
|
|
|
|
|
|
@bot.defaultreplymarkup
|
|
def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
|
|
buttons = []
|
|
for device_id, data in config['relays'].items():
|
|
labels = data['labels']
|
|
type_emoji = type_emojis[data['type']]
|
|
row = [f'{type_emoji}{status_emoji[i.value]} {labels[ctx.user_lang]}'
|
|
for i in UserAction]
|
|
buttons.append(row)
|
|
return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
devices = []
|
|
for device_id, data in config['relays'].items():
|
|
devices.append(MqttEspDevice(id=device_id,
|
|
secret=data['secret']))
|
|
labels = data['labels']
|
|
bot.lang.ru(**{device_id: labels['ru']})
|
|
bot.lang.en(**{device_id: labels['en']})
|
|
|
|
type_emoji = type_emojis[data['type']]
|
|
|
|
for action in UserAction:
|
|
messages = []
|
|
for _lang, _label in labels.items():
|
|
messages.append(f'{type_emoji}{status_emoji[action.value]} {labels[_lang]}')
|
|
bot.handler(texts=messages)(partial(enable_handler if action == UserAction.ON else disable_handler, device_id))
|
|
|
|
mqtt_relay = MqttRelay(devices=devices)
|
|
mqtt_relay.set_message_callback(on_mqtt_message)
|
|
mqtt_relay.configure_tls()
|
|
mqtt_relay.connect_and_loop(loop_forever=False)
|
|
|
|
# bot.enable_logging(BotType.RELAY_MQTT)
|
|
bot.run(start_handler=start)
|
|
|
|
mqtt_relay.disconnect()
|