homekit/src/pump_bot.py
2022-11-06 20:53:55 +03:00

132 lines
3.4 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
from enum import Enum
from typing import Optional
from telegram import ReplyKeyboardMarkup, User
from home.config import config
from home.telegram import bot
from home.telegram._botutil import user_any_name
from home.relay import RelayClient
from home.api.types import BotType
config.load('pump_bot')
bot.initialize()
bot.lang.ru(
start_message="Выберите команду на клавиатуре",
unknown_command="Неизвестная команда",
enable="Включить",
enable_silently="Включить тихо",
enabled="Включен ✅",
disable="Выключить",
disable_silently="Выключить тихо",
disabled="Выключен ❌",
status="Статус",
done="Готово 👌",
user_action_notification='Пользователь <a href="tg://user?id=%d">%s</a> <b>%s</b> насос.',
user_action_on="включил",
user_action_off="выключил",
)
bot.lang.en(
start_message="Select command on the keyboard",
unknown_command="Unknown command",
enable="Turn ON",
enable_silently="Turn ON silently",
enabled="Turned ON ✅",
disable="Turn OFF",
disable_silently="Turn OFF silently",
disabled="Turned OFF ❌",
status="Status",
done="Done 👌",
user_action_notification='User <a href="tg://user?id=%d">%s</a> turned the pump <b>%s</b>.',
user_action_on="ON",
user_action_off="OFF",
)
class UserAction(Enum):
ON = 'on'
OFF = 'off'
def get_relay() -> RelayClient:
relay = RelayClient(host=config['relay']['ip'], port=config['relay']['port'])
relay.connect()
return relay
def on(ctx: bot.Context, silent=False) -> None:
get_relay().on()
ctx.reply(ctx.lang('done'))
if not silent:
notify(ctx.user, UserAction.ON)
def off(ctx: bot.Context, silent=False) -> None:
get_relay().off()
ctx.reply(ctx.lang('done'))
if not silent:
notify(ctx.user, UserAction.OFF)
def notify(user: User, action: UserAction) -> None:
def text_getter(lang: str):
action_name = bot.lang.get(f'user_action_{action.value}', lang)
user_name = user_any_name(user)
return ' ' + bot.lang.get('user_action_notification', lang,
user.id, user_name, action_name)
bot.notify_all(text_getter, exclude=(user.id,))
@bot.handler(message='enable')
def enable_handler(ctx: bot.Context) -> None:
on(ctx)
@bot.handler(message='enable_silently')
def enable_s_handler(ctx: bot.Context) -> None:
on(ctx, True)
@bot.handler(message='disable')
def disable_handler(ctx: bot.Context) -> None:
off(ctx)
@bot.handler(message='disable_silently')
def disable_s_handler(ctx: bot.Context) -> None:
off(ctx, True)
@bot.handler(message='status')
def status(ctx: bot.Context) -> None:
ctx.reply(
ctx.lang('enabled') if get_relay().status() == 'on' else ctx.lang('disabled')
)
@bot.defaultreplymarkup
def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
buttons = [
[ctx.lang('enable'), ctx.lang('disable')],
]
if ctx.user_id in config['bot']['silent_users']:
buttons.append([ctx.lang('enable_silently'), ctx.lang('disable_silently')])
buttons.append([ctx.lang('status')])
return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)
if __name__ == '__main__':
bot.enable_logging(BotType.PUMP)
bot.run()