pump_bot: port to new config scheme and PTB 20

This commit is contained in:
Evgeny Zinoviev 2023-06-11 02:27:43 +03:00
parent 1d0b9c5d1c
commit eaab12b8f4
3 changed files with 113 additions and 70 deletions

View File

@ -1,27 +1,62 @@
#!/usr/bin/env python3
import __py_include
import sys
import asyncio
from enum import Enum
from typing import Optional
from typing import Optional, Union
from telegram import ReplyKeyboardMarkup, User
from time import time
from datetime import datetime
from homekit.config import config, is_development_mode
from homekit.config import config, is_development_mode, AppConfigUnit
from homekit.telegram import bot
from homekit.telegram.config import TelegramBotConfig, TelegramUserListType
from homekit.telegram._botutil import user_any_name
from homekit.relay.sunxi_h3_client import RelayClient
from homekit.mqtt import MqttNode, MqttWrapper, MqttPayload
from homekit.mqtt import MqttNode, MqttWrapper, MqttPayload, MqttNodesConfig, MqttModule
from homekit.mqtt.module.relay import MqttPowerStatusPayload, MqttRelayModule
from homekit.mqtt.module.temphum import MqttTemphumDataPayload
from homekit.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload
config.load_app('pump_bot')
if __name__ != '__main__':
print(f'this script can not be imported as module', file=sys.stderr)
sys.exit(1)
mqtt_nodes_config = MqttNodesConfig()
class PumpBotUserListType(TelegramUserListType):
SILENT = 'silent_users'
class PumpBotConfig(AppConfigUnit, TelegramBotConfig):
NAME = 'pump_bot'
@classmethod
def schema(cls) -> Optional[dict]:
return {
**super(TelegramBotConfig).schema(),
PumpBotUserListType.SILENT: TelegramBotConfig._userlist_schema(),
'watering_relay_node': {'type': 'string'},
'pump_relay_addr': cls._addr_schema()
}
@staticmethod
def custom_validator(data):
relay_node_names = mqtt_nodes_config.get_nodes(filters=('relay',), only_names=True)
if data['watering_relay_node'] not in relay_node_names:
raise ValueError(f'unknown relay node "{data["watering_relay_node"]}"')
config.load_app(PumpBotConfig)
mqtt: MqttWrapper
mqtt_node: MqttNode
mqtt_relay_module: Union[MqttRelayModule, MqttModule]
mqtt: Optional[MqttWrapper] = None
mqtt_node: Optional[MqttNode] = None
mqtt_relay_module: Optional[MqttRelayModule] = None
time_format = '%d.%m.%Y, %H:%M:%S'
watering_mcu_status = {
@ -99,81 +134,89 @@ class UserAction(Enum):
def get_relay() -> RelayClient:
relay = RelayClient(host=config['relay']['ip'], port=config['relay']['port'])
relay = RelayClient(host=config.app_config['pump_relay_addr'].host,
port=config.app_config['pump_relay_addr'].port)
relay.connect()
return relay
def on(ctx: bot.Context, silent=False) -> None:
async def on(ctx: bot.Context, silent=False) -> None:
get_relay().on()
ctx.reply(ctx.lang('done'))
futures = [ctx.reply(ctx.lang('done'))]
if not silent:
notify(ctx.user, UserAction.ON)
futures.append(notify(ctx.user, UserAction.ON))
await asyncio.gather(*futures)
def off(ctx: bot.Context, silent=False) -> None:
async def off(ctx: bot.Context, silent=False) -> None:
get_relay().off()
ctx.reply(ctx.lang('done'))
futures = [ctx.reply(ctx.lang('done'))]
if not silent:
notify(ctx.user, UserAction.OFF)
futures.append(notify(ctx.user, UserAction.OFF))
await asyncio.gather(*futures)
def watering_on(ctx: bot.Context) -> None:
mqtt_relay_module.switchpower(True, config.get('mqtt_water_relay.secret'))
ctx.reply(ctx.lang('sent'))
notify(ctx.user, UserAction.WATERING_ON)
async def watering_on(ctx: bot.Context) -> None:
mqtt_relay_module.switchpower(True)
await asyncio.gather(
ctx.reply(ctx.lang('sent')),
notify(ctx.user, UserAction.WATERING_ON)
)
def watering_off(ctx: bot.Context) -> None:
mqtt_relay_module.switchpower(False, config.get('mqtt_water_relay.secret'))
ctx.reply(ctx.lang('sent'))
notify(ctx.user, UserAction.WATERING_OFF)
async def watering_off(ctx: bot.Context) -> None:
mqtt_relay_module.switchpower(False)
await asyncio.gather(
ctx.reply(ctx.lang('sent')),
notify(ctx.user, UserAction.WATERING_OFF)
)
def notify(user: User, action: UserAction) -> None:
async def notify(user: User, action: UserAction) -> None:
notification_key = 'user_watering_notification' if action in (UserAction.WATERING_ON, UserAction.WATERING_OFF) else 'user_action_notification'
def text_getter(lang: str):
action_name = bot.lang.get(f'user_action_{action.value}', lang)
user_name = user_any_name(user)
return ' ' + bot.lang.get(notification_key, lang,
user.id, user_name, action_name)
bot.notify_all(text_getter, exclude=(user.id,))
await bot.notify_all(text_getter, exclude=(user.id,))
@bot.handler(message='enable')
def enable_handler(ctx: bot.Context) -> None:
on(ctx)
async def enable_handler(ctx: bot.Context) -> None:
await on(ctx)
@bot.handler(message='enable_silently')
def enable_s_handler(ctx: bot.Context) -> None:
on(ctx, True)
async def enable_s_handler(ctx: bot.Context) -> None:
await on(ctx, True)
@bot.handler(message='disable')
def disable_handler(ctx: bot.Context) -> None:
off(ctx)
async def disable_handler(ctx: bot.Context) -> None:
await off(ctx)
@bot.handler(message='start_watering')
def start_watering(ctx: bot.Context) -> None:
watering_on(ctx)
async def start_watering(ctx: bot.Context) -> None:
await watering_on(ctx)
@bot.handler(message='stop_watering')
def stop_watering(ctx: bot.Context) -> None:
watering_off(ctx)
async def stop_watering(ctx: bot.Context) -> None:
await watering_off(ctx)
@bot.handler(message='disable_silently')
def disable_s_handler(ctx: bot.Context) -> None:
off(ctx, True)
async def disable_s_handler(ctx: bot.Context) -> None:
await off(ctx, True)
@bot.handler(message='status')
def status(ctx: bot.Context) -> None:
ctx.reply(
async def status(ctx: bot.Context) -> None:
await ctx.reply(
ctx.lang('enabled') if get_relay().status() == 'on' else ctx.lang('disabled')
)
@ -186,7 +229,7 @@ def _get_timestamp_as_string(timestamp: int) -> str:
@bot.handler(message='watering_status')
def watering_status(ctx: bot.Context) -> None:
async def watering_status(ctx: bot.Context) -> None:
buf = ''
if 0 < watering_mcu_status["last_time"] < time()-1800:
buf += '<b>WARNING! long time no reports from mcu! maybe something\'s wrong</b>\n'
@ -195,13 +238,13 @@ def watering_status(ctx: bot.Context) -> None:
buf += f'boot time: <b>{_get_timestamp_as_string(watering_mcu_status["last_boot_time"])}</b>\n'
buf += 'relay opened: <b>' + ('yes' if watering_mcu_status['relay_opened'] else 'no') + '</b>\n'
buf += f'ambient temp & humidity: <b>{watering_mcu_status["ambient_temp"]} °C, {watering_mcu_status["ambient_rh"]}%</b>'
ctx.reply(buf)
await ctx.reply(buf)
@bot.defaultreplymarkup
def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
buttons = []
if ctx.user_id in config['bot']['silent_users']:
if ctx.user_id in config.app_config.get_user_ids(PumpBotUserListType.SILENT):
buttons.append([ctx.lang('enable_silently'), ctx.lang('disable_silently')])
buttons.append([ctx.lang('enable'), ctx.lang('disable'), ctx.lang('status')],)
buttons.append([ctx.lang('start_watering'), ctx.lang('stop_watering'), ctx.lang('watering_status')])
@ -234,22 +277,21 @@ def mqtt_payload_callback(mqtt_node: MqttNode, payload: MqttPayload):
watering_mcu_status['relay_opened'] = payload.opened
if __name__ == '__main__':
mqtt = MqttWrapper()
mqtt_node = MqttNode(node_id=config.get('mqtt_water_relay.node_id'))
if is_development_mode():
mqtt_node.load_module('diagnostics')
mqtt = MqttWrapper(client_id='pump_bot')
mqtt_node = MqttNode(node_id=config.app_config['watering_relay_node'])
if is_development_mode():
mqtt_node.load_module('diagnostics')
mqtt_node.load_module('temphum')
mqtt_relay_module = mqtt_node.load_module('relay')
mqtt_node.load_module('temphum')
mqtt_relay_module = mqtt_node.load_module('relay')
mqtt_node.add_payload_callback(mqtt_payload_callback)
mqtt_node.add_payload_callback(mqtt_payload_callback)
mqtt.connect_and_loop(loop_forever=False)
mqtt.connect_and_loop(loop_forever=False)
bot.run()
bot.run()
try:
mqtt.disconnect()
except:
pass
try:
mqtt.disconnect()
except:
pass

View File

@ -10,8 +10,7 @@ from functools import partial
from homekit.config import config, AppConfigUnit, Translation
from homekit.telegram import bot
from homekit.telegram.config import TelegramBotConfig
from homekit.mqtt import MqttPayload, MqttNode, MqttWrapper, MqttModule
from homekit.mqtt import MqttNodesConfig
from homekit.mqtt import MqttPayload, MqttNode, MqttWrapper, MqttModule, MqttNodesConfig
from homekit.mqtt.module.relay import MqttRelayModule, MqttRelayState
from homekit.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload

View File

@ -26,6 +26,7 @@ from ._botlang import lang, languages
from ._botdb import BotDatabase
from ._botutil import exc2text, IgnoreMarkup
from ._botcontext import Context
from .config import TelegramUserListType
db: Optional[BotDatabase] = None
@ -518,29 +519,30 @@ async def _default_any_handler(ctx: Context):
# _reporting.report(update.callback_query.message, text=update.callback_query.data)
def notify_all(text_getter: callable,
exclude: Tuple[int] = ()) -> None:
if 'notify_users' not in config['bot']:
_logger.error('notify_all() called but no notify_users directive found in the config')
async def notify_all(text_getter: callable,
exclude: Tuple[int] = ()) -> None:
notify_user_ids = config.app_config.get_user_ids(TelegramUserListType.NOTIFY)
if not notify_user_ids:
_logger.error('notify_all() called but no notify_users defined in the config')
return
for user_id in config['bot']['notify_users']:
for user_id in notify_user_ids:
if user_id in exclude:
continue
text = text_getter(db.get_user_lang(user_id))
_application.bot.send_message(chat_id=user_id,
text=text,
parse_mode='HTML')
await _application.bot.send_message(chat_id=user_id,
text=text,
parse_mode='HTML')
def notify_user(user_id: int, text: Union[str, Exception], **kwargs) -> None:
async def notify_user(user_id: int, text: Union[str, Exception], **kwargs) -> None:
if isinstance(text, Exception):
text = exc2text(text)
_application.bot.send_message(chat_id=user_id,
text=text,
parse_mode='HTML',
**kwargs)
await _application.bot.send_message(chat_id=user_id,
text=text,
parse_mode='HTML',
**kwargs)
def send_photo(user_id, **kwargs):